Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
I see you have no worker machines to execute the job [image: Inline image 1] You haven't configured your spark cluster properly. Quick fix to get it running would be run it on local mode, for that change this line JavaStreamingContext jssc = *new* JavaStreamingContext("spark:// 192.168.88.130:7077", "SparkStream", *new* Duration(3000)); to this JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]", "SparkStream", *new* Duration(3000)); Thanks Best Regards On Mon, Dec 1, 2014 at 4:18 PM, wrote: > Hi, > > > > The spark master is working, and I have given the same url in the code: > > > > The warning is gone, and the new log is: > > --- > > Time: 141742785 ms > > --- > > > > INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler > (Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0 > from job set of time 141742785 ms > > INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler > (Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0 > from job set of time 141742785 ms > > INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler > (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785 > ms (execution: 0.001 s) > > INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler > (Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms > > INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD > (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list > > INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager > (Logging.scala:logInfo(59)) - Removing RDD 25 > > INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD > (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list > > INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager > (Logging.scala:logInfo(59)) - Removing RDD 24 > > INFO [sparkDriver-akka.actor.default-dispatcher-5] > kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of > RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time > 141742785 ms > > INFO [sparkDriver-akka.actor.default-dispatcher-4] > scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0 > received 0 blocks* > > --- > > Time: 1417427853000 ms > > --- > > > > INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler > (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 > from job set of time 1417427853000 ms > > INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler > (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 > from job set of time 1417427853000 ms > > INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler > (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 > ms (execution: 0.001 s) > > INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler > (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms > > INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD > (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list > > INFO [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager > (Logging.scala:logInfo(59)) - Removing RDD 27 > > INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD > (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list > > INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager > (Logging.scala:logInfo(59)) - Removing RDD 26 > > INFO [sparkDriver-akka.actor.default-dispatcher-4] > kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of > RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time > 1417427853000 ms > > INFO [sparkDriver-akka.actor.default-dispatcher-6] > scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0 > received 0 blocks* > > > > What should be my approach now ? > > Need urgent help. > > > > Regards, > > Aiman > > > > *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] > *Sent:* Monday, December 01, 2014 3:56 PM > *To:* Sarosh, M. > *Cc:* user@spark.apache.org > *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks > > > > It says: > > > > 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not >
RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
Hi, The spark master is working, and I have given the same url in the code: [cid:image001.png@01D00D82.6DC2FFF0] The warning is gone, and the new log is: --- Time: 141742785 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 25 INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 24 INFO [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks --- Time: 1417427853000 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 27 INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 26 INFO [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks What should be my approach now ? Need urgent help. Regards, Aiman From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, December 01, 2014 3:56 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks It says: 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory A quick guess would be, you are giving the wrong master url. ( spark://192.168.88.130:7077<http://192.168.88.130:7077/> ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page. Thanks Best Regards On Mon, Dec 1, 2014 at 3:42 PM, mailto:m.sar...@accenture.com>> wrote: Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen. The daemons are all up: Spark-master,worker; zookeeper; kafka. I am writing a java code for doing it, using KafkaUtils.createStream code is below: package com.spark; import scala.Tuple2; import kafka.serializer.Decoder; import kafka.serializer.Encoder; import org.apache.spark.streaming.Duration; import org.apache.spa
Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
It says: 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory A quick guess would be, you are giving the wrong master url. ( spark:// 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page. Thanks Best Regards On Mon, Dec 1, 2014 at 3:42 PM, wrote: > Hi, > > > > I am integrating Kafka and Spark, using spark-streaming. I have created a > topic as a kafka producer: > > > > bin/kafka-topics.sh --create --zookeeper localhost:2181 > --replication-factor 1 --partitions 1 --topic test > > > > > > I am publishing messages in kafka and trying to read them using > spark-streaming java code and displaying them on screen. > > The daemons are all up: Spark-master,worker; zookeeper; kafka. > > I am writing a java code for doing it, using KafkaUtils.createStream > > code is below: > > > > *package* *com.spark*; > > > > *import* scala.Tuple2; > > *import* *kafka*.serializer.Decoder; > > *import* *kafka*.serializer.Encoder; > > *import* org.apache.spark.streaming.Duration; > > *import* org.apache.spark.*; > > *import* org.apache.spark.api.java.function.*; > > *import* org.apache.spark.api.java.*; > > *import* *org.apache.spark.streaming.kafka*.KafkaUtils; > > *import* *org.apache.spark.streaming.kafka*.*; > > *import* org.apache.spark.streaming.api.java.JavaStreamingContext; > > *import* org.apache.spark.streaming.api.java.JavaPairDStream; > > *import* org.apache.spark.streaming.api.java.JavaDStream; > > *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; > > *import* java.util.Map; > > *import* java.util.HashMap; > > > > *public* *class* *SparkStream* { > >*public* *static* *void* main(String args[]) > >{ > > *if*(args.length != 3) > > { > > System.*out*.println("Usage: spark-submit –class > com.spark.SparkStream target/SparkStream-with-dependencies.jar > "); > > System.*exit*(1); > > } > > > > > > Map topicMap = *new* > HashMap(); > > String[] topic = args[2].split(","); > > *for*(String t: topic) > > { > > topicMap.put(t, *new* Integer(1)); > > } > > > > JavaStreamingContext jssc = *new* JavaStreamingContext( > "spark://192.168.88.130:7077", "SparkStream", *new* Duration(3000)); > > JavaPairReceiverInputDStream messages = > *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap ); > > > > System.*out*.println("Connection done"); > > JavaDStream data = messages.map(*new* > *Function String>, String>()* > > { > >*public* String > call(Tuple2 message) > >{ > > System.*out* > .println("NewMessage: "+message._2()); //for debugging > > *return* > message._2(); > >} > > }); > > > > data.print(); > > > > jssc.start(); > > jssc.awaitTermination(); > > > >} > > } > > > > > > I am running the job, and at other terminal I am running kafka-producer to > publish messages: > > #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > > >Hi kafka > > >second message > > >another message > > > > But the output logs at the spark-streaming console doesn't show the > messages, but shows zero blocks received: > > > > > > --- > > Time: 1417107363000 ms > > --- > > > > 14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming > job 1417107363000 ms.0 from job set of time 1417107363000 ms > > 14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming > job 1417107363000 ms.0 from job set of time 1417107363000 ms > > 14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s > for time 1417107363000 ms (execution: 0.000 s) > > 14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time > 1417107363000 ms > > 14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence > list > > 14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13 > > 14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD > BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time > 1417107363000 ms > > 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has > not accepted any resources; check your cluster UI to ensure that workers > are regi