Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic 
as a kafka producer:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming 
java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
       public static void main(String args[])
       {
              if(args.length != 3)
              {
                     System.out.println("Usage: spark-submit -class 
com.spark.SparkStream target/SparkStream-with-dependencies.jar <zookeeper_ip> 
<group_name> <topic1,topic2,...>");
                     System.exit(1);
              }


              Map<String,Integer> topicMap = new HashMap<String,Integer>();
              String[] topic = args[2].split(",");
              for(String t: topic)
              {
                     topicMap.put(t, new Integer(1));
              }

              JavaStreamingContext jssc = new 
JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new 
Duration(3000));
              JavaPairReceiverInputDStream<String, String> messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

              System.out.println("Connection done");
              JavaDStream<String> data = messages.map(new 
Function<Tuple2<String, String>, String>()
                                                {
                                                       public String 
call(Tuple2<String, String> message)
                                                       {
                                                              
System.out.println("NewMessage: "+message._2()); //for debugging
                                                              return 
message._2();
                                                       }
                                                });

data.print();

              jssc.start();
              jssc.awaitTermination();

       }
}


I am running the job, and at other terminal I am running kafka-producer to 
publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>    Hi kafka
>    second message
>    another message

But the output logs at the spark-streaming console doesn't show the messages, 
but shows zero blocks received:


    -------------------------------------------
    Time: 1417107363000 ms
    -------------------------------------------

    14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for 
time 1417107363000 ms (execution: 0.000 s)
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 
1417107363000 ms
    14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
    14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
    14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD 
BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 
ms
    14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
    14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks


Why isn't the data block getting received? i have tried using kafka 
producer-consumer on console bin/kafka-console-producer....  and 
bin/kafka-console-consumer...  its working perfect, but why not the code above? 
Please help me.


Regards,
Aiman Sarosh


________________________________

This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
______________________________________________________________________________________

www.accenture.com

Reply via email to