I am using the latest version of kafka.
package kafka.test;
import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import 
kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import 
kafka.javaapi.consumer.ConsumerConnector;import 
kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import 
kafka.producer.ProducerConfig;import kafka.server.KafkaConfig;import 
kafka.server.KafkaServer;import org.apache.zookeeper.server.ServerConfig;import 
org.apache.zookeeper.server.ZooKeeperServerMain;import 
org.springframework.beans.factory.DisposableBean;import 
org.springframework.beans.factory.InitializingBean;
import java.io.IOException;import java.util.HashMap;import 
java.util.List;import java.util.Map;import java.util.Properties;import 
java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
/** * Melodi Kafka Broker. * TODO KB: This is not final. */public class 
MelodiKafkaBroker implements InitializingBean, DisposableBean{  public static 
final String KAFKA_LOCALHOST_BROKER = "127.0.0.1:9092";   public static final 
String NBV_BATCH_TOPIC = "nbv-batch-topic"; public static final String 
KAFKA_LOG_DIR = "c:/temp/kafka";     public static final int KAFKA_PORT = 9092; 
     public static final int KAFKA_BROKER_ID = 0;
        public static final String ZK_LOG_DIR = "c:/temp/zookeeper";    public 
static final String ZK_PORT = "9093";
        /**      * The actual kafka server.      */     private KafkaServer 
kafkaServer;
        /**      * ZooKeeper config.     */     private ServerConfig zkConfig;
        /**      * ZooKeeper server.     */     private ZooKeeperServerMain 
zkServer;
        //      // BEAN INIT.   //
        @Override       public void afterPropertiesSet() throws Exception       
{               // start a zookeeper.           zkConfig = new ServerConfig();  
        zkConfig.parse(new String[] { ZK_PORT, ZK_LOG_DIR });           
zkServer = new ZooKeeperServerMain();
                new Thread()            {                       public void 
run()                       {                               try                 
            {                                       
System.out.println("starting zookeeper");
                                        zkServer.runFromConfig(zkConfig);       
                        }                               catch (IOException e)   
                        {                                       
System.out.println("ZooKeeper Failed");                                 
e.printStackTrace(System.err);                          }                       
}               }.start();
                // start a kafka                new Thread()            {       
                public void run()                       {                       
        try                             {                                       
System.out.println("starting kafka");
                                        Properties properties = new 
Properties();                                       properties.put("port", 
KAFKA_PORT + "");                                        
properties.put("broker.id", KAFKA_BROKER_ID + "");                              
        properties.put("log.dir", KAFKA_LOG_DIR);                               
        properties.put("zookeeper.connect", "127.0.0.1:9093");
                                        kafkaServer = new KafkaServer(new 
KafkaConfig(properties), KafkaServer.$lessinit$greater$default$2());            
                      kafkaServer.startup();                          }         
                      catch (Exception e)                             {         
                              System.out.println("Kafka Failed");               
                      e.printStackTrace(System.err);                          } 
                      }               }.start();
                // start consumers              System.out.println("starting 
consumers....");
                Properties props = new Properties();            
props.put("zookeeper.connect", "127.0.0.1:9093");               
props.put("group.id", "nbv-group");             
props.put("zookeeper.session.timeout.ms", "500");               
props.put("zookeeper.sync.time.ms", "250");             
props.put("auto.commit.interval.ms", "1000");
                ConsumerConnector consumer = 
Consumer.createJavaConsumerConnector(                              new 
ConsumerConfig(props));
                Map<String, Integer> topicCountMap = new HashMap<String, 
Integer>();            topicCountMap.put(NBV_BATCH_TOPIC, new Integer(1));      
       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);              
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(NBV_BATCH_TOPIC);
                // now launch all the threads           //
                ExecutorService executor = Executors.newFixedThreadPool(1);
                // now create an object to consume the messages         //      
        int threadNumber = 0;           for (final KafkaStream stream : 
streams)                {                       executor.submit(new 
ConsumerTest(stream, threadNumber));                        threadNumber++;     
    }       }
        @Override       public void destroy() throws Exception  {               
kafkaServer.shutdown();         System.out.println("kafka stop");       }
        //      // PUBLIC INTERFACE     //
        public static void main(String[] args) throws Exception {               
MelodiKafkaBroker broker = new MelodiKafkaBroker();             
broker.afterPropertiesSet();
                System.out.println("starting a producer....");
                Properties props = new Properties();            
props.put("metadata.broker.list", KAFKA_LOCALHOST_BROKER);              
props.put("zookeeper.connect", "127.0.0.1:9093");               
props.put("serializer.class", "kafka.serializer.StringEncoder");                
ProducerConfig config = new ProducerConfig(props);              
Producer<String, String> kafkaProducer = new Producer<String, String>(config);
                System.out.println("Producing messages....");           for 
(int i = 0; i < 10; i++)            {                       
System.out.println("message to send......");                    
kafkaProducer.send(new KeyedMessage<String, String>(NBV_BATCH_TOPIC, "hello 
kafka " + i + "."));                        System.out.println("message 
sent......");               }       }
        public static class ConsumerTest implements Runnable    {               
private KafkaStream stream;             private int id;
                public ConsumerTest(KafkaStream stream, int id)         {       
                this.id = id;                   this.stream = stream;           
}
                public void run()               {                       
ConsumerIterator<byte[], byte[]> it = stream.iterator();                        
while (it.hasNext())                    {                               
System.out.println("Thread " + id + ": " + new String(it.next().message()));    
                }
                        System.out.println("Shutting down Thread: " + id);      
        }       }}                                        

Reply via email to