Hi, The spark master is working, and I have given the same url in the code: [cid:image001.png@01D00D82.6DC2FFF0]
The warning is gone, and the new log is: ------------------------------------------- Time: 1417427850000 ms ------------------------------------------- INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0 from job set of time 1417427850000 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0 from job set of time 1417427850000 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 25 INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 24 INFO [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427850000 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks ------------------------------------------- Time: 1417427853000 ms ------------------------------------------- INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 27 INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 26 INFO [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks What should be my approach now ? Need urgent help. Regards, Aiman From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, December 01, 2014 3:56 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks It says: 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory A quick guess would be, you are giving the wrong master url. ( spark://192.168.88.130:7077<http://192.168.88.130:7077/> ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page. Thanks Best Regards On Mon, Dec 1, 2014 at 3:42 PM, <m.sar...@accenture.com<mailto:m.sar...@accenture.com>> wrote: Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen. The daemons are all up: Spark-master,worker; zookeeper; kafka. I am writing a java code for doing it, using KafkaUtils.createStream code is below: package com.spark; import scala.Tuple2; import kafka.serializer.Decoder; import kafka.serializer.Encoder; import org.apache.spark.streaming.Duration; import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.*; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.*; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import java.util.Map; import java.util.HashMap; public class SparkStream { public static void main(String args[]) { if(args.length != 3) { System.out.println("Usage: spark-submit –class com.spark.SparkStream target/SparkStream-with-dependencies.jar <zookeeper_ip> <group_name> <topic1,topic2,...>"); System.exit(1); } Map<String,Integer> topicMap = new HashMap<String,Integer>(); String[] topic = args[2].split(","); for(String t: topic) { topicMap.put(t, new Integer(1)); } JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077>", "SparkStream", new Duration(3000)); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println("Connection done"); JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> message) { System.out.println("NewMessage: "+message._2()); //for debugging return message._2(); } }); data.print(); jssc.start(); jssc.awaitTermination(); } } I am running the job, and at other terminal I am running kafka-producer to publish messages: #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > Hi kafka > second message > another message But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received: ------------------------------------------- Time: 1417107363000 ms ------------------------------------------- 14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for time 1417107363000 ms (execution: 0.000 s) 14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 1417107363000 ms 14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list 14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13 14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 ms 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks Why isn't the data block getting received? i have tried using kafka producer-consumer on console bin/kafka-console-producer.... and bin/kafka-console-consumer... its working perfect, but why not the code above? Please help me. Regards, Aiman Sarosh ________________________________ This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. ______________________________________________________________________________________ www.accenture.com<http://www.accenture.com>