Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen. The daemons are all up: Spark-master,worker; zookeeper; kafka. I am writing a java code for doing it, using KafkaUtils.createStream code is below: package com.spark; import scala.Tuple2; import kafka.serializer.Decoder; import kafka.serializer.Encoder; import org.apache.spark.streaming.Duration; import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.*; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.*; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import java.util.Map; import java.util.HashMap; public class SparkStream { public static void main(String args[]) { if(args.length != 3) { System.out.println("Usage: spark-submit -class com.spark.SparkStream target/SparkStream-with-dependencies.jar <zookeeper_ip> <group_name> <topic1,topic2,...>"); System.exit(1); } Map<String,Integer> topicMap = new HashMap<String,Integer>(); String[] topic = args[2].split(","); for(String t: topic) { topicMap.put(t, new Integer(1)); } JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000)); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println("Connection done"); JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> message) { System.out.println("NewMessage: "+message._2()); //for debugging return message._2(); } }); data.print(); jssc.start(); jssc.awaitTermination(); } } I am running the job, and at other terminal I am running kafka-producer to publish messages: #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > Hi kafka > second message > another message But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received: ------------------------------------------- Time: 1417107363000 ms ------------------------------------------- 14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for time 1417107363000 ms (execution: 0.000 s) 14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 1417107363000 ms 14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list 14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13 14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 ms 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks Why isn't the data block getting received? i have tried using kafka producer-consumer on console bin/kafka-console-producer.... and bin/kafka-console-consumer... its working perfect, but why not the code above? Please help me. Regards, Aiman Sarosh ________________________________ This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. ______________________________________________________________________________________ www.accenture.com