What is the value of spark master conf.. By default it is local, that means
only one thread can run and that is why your job is stuck.
Specify it local[*], to make thread pool equal to number of cores...

Raghav
On Sep 11, 2015 6:06 AM, "Atul Kulkarni" <atulskulka...@gmail.com> wrote:

> Hi Folks,
>
> Below is the code  have for Spark based Kafka Producer to take advantage
> of multiple executors reading files in parallel on my cluster but I am
> stuck at The program not making any progress.
>
> Below is my scrubbed code:
>
> val sparkConf = new SparkConf().setAppName(applicationName)
> val ssc = new StreamingContext(sparkConf, Seconds(2))
>
> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))
>
> val zipFileDStreams = ssc.textFileStream(inputFiles)
> zipFileDStreams.foreachRDD {
>   rdd =>
>     rdd.foreachPartition(
>       partition => {
>         partition.foreach{
>           case (logLineText) =>
>             println(logLineText)
>             producerObj.value.send(topics, logLineText)
>         }
>       }
>     )
> }
>
> ssc.start()
> ssc.awaitTermination()
>
> ssc.stop()
>
> The code for KafkaSink is as follows.
>
> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], 
> Array[Byte]]) extends Serializable {
>
>   lazy val producer = createProducer()
>   val logParser = new LogParser()
>
>   def send(topic: String, value: String): Unit = {
>
>     val logLineBytes = 
> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
>     producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
> logLineBytes))
>   }
> }
>
> object KafkaSink {
>   def apply(config: Properties): KafkaSink = {
>
>     val f = () => {
>       val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, 
> null, null)
>
>       sys.addShutdownHook {
>         producer.close()
>       }
>       producer
>     }
>
>     new KafkaSink(f)
>   }
> }
>
> Disclaimer: it is based on the code inspired by
> http://allegro.tech/spark-kafka-integration.html.
>
> The job just sits there I cannot see any Job Stages being created.
> Something I want to mention - I I am trying to read gzipped files from HDFS
> - could it be that Streaming context is not able to read *.gz files?
>
>
> I am not sure what more details I can provide to help explain my problem.
>
>
> --
> Regards,
> Atul Kulkarni
>

Reply via email to