Hi,

The spark master is working, and I have given the same url in the code:
[cid:image001.png@01D00D82.6DC2FFF0]

The warning is gone, and the new log is:
-------------------------------------------
Time: 1417427850000 ms
-------------------------------------------

INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0 
from job set of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0 
from job set of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000 ms 
(execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD 
(Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 25
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD 
(Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 24
INFO  [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream 
(Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD 
at ReceiverInputDStream.scala:69 of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker 
(Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
-------------------------------------------
Time: 1417427853000 ms
-------------------------------------------

INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 
from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 
from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms 
(execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD 
(Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 27
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD 
(Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 26
INFO  [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream 
(Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD 
at ReceiverInputDStream.scala:69 of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker 
(Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

What should be my approach now ?
Need urgent help.

Regards,
Aiman

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Monday, December 01, 2014 3:56 PM
To: Sarosh, M.
Cc: user@spark.apache.org
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( 
spark://192.168.88.130:7077<http://192.168.88.130:7077/> ) Open the webUI 
running on port 8080 and use the master url listed there on top left corner of 
the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, 
<m.sar...@accenture.com<mailto:m.sar...@accenture.com>> wrote:
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic 
as a kafka producer:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming 
java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
       public static void main(String args[])
       {
              if(args.length != 3)
              {
                     System.out.println("Usage: spark-submit –class 
com.spark.SparkStream target/SparkStream-with-dependencies.jar <zookeeper_ip> 
<group_name> <topic1,topic2,...>");
                     System.exit(1);
              }


              Map<String,Integer> topicMap = new HashMap<String,Integer>();
              String[] topic = args[2].split(",");
              for(String t: topic)
              {
                     topicMap.put(t, new Integer(1));
              }

              JavaStreamingContext jssc = new 
JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077>", 
"SparkStream", new Duration(3000));
              JavaPairReceiverInputDStream<String, String> messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

              System.out.println("Connection done");
              JavaDStream<String> data = messages.map(new 
Function<Tuple2<String, String>, String>()
                                                {
                                                       public String 
call(Tuple2<String, String> message)
                                                       {
                                                              
System.out.println("NewMessage: "+message._2()); //for debugging
                                                              return 
message._2();
                                                       }
                                                });

data.print();

              jssc.start();
              jssc.awaitTermination();

       }
}


I am running the job, and at other terminal I am running kafka-producer to 
publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>    Hi kafka
>    second message
>    another message

But the output logs at the spark-streaming console doesn't show the messages, 
but shows zero blocks received:


    -------------------------------------------
    Time: 1417107363000 ms
    -------------------------------------------

    14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for 
time 1417107363000 ms (execution: 0.000 s)
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 
1417107363000 ms
    14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
    14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
    14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD 
BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 
ms
    14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
    14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks


Why isn't the data block getting received? i have tried using kafka 
producer-consumer on console bin/kafka-console-producer....  and 
bin/kafka-console-consumer...  its working perfect, but why not the code above? 
Please help me.


Regards,
Aiman Sarosh


________________________________

This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
______________________________________________________________________________________

www.accenture.com<http://www.accenture.com>

Reply via email to