Hi Raghavendra, Thanks for your answers, I am passing 10 executors and I am not sure if that is the problem. It is still hung.
Regards, Atul. On Fri, Sep 11, 2015 at 12:40 AM, Raghavendra Pandey < raghavendra.pan...@gmail.com> wrote: > You can pass the number of executors via command line option > --num-executors.You need more than 2 executors to make spark-streaming > working. > > For more details on command line option, please go through > http://spark.apache.org/docs/latest/running-on-yarn.html. > > > On Fri, Sep 11, 2015 at 10:52 AM, Atul Kulkarni <atulskulka...@gmail.com> > wrote: > >> I am submitting the job with yarn-cluster mode. >> >> spark-submit --master yarn-cluster ... >> >> On Thu, Sep 10, 2015 at 7:50 PM, Raghavendra Pandey < >> raghavendra.pan...@gmail.com> wrote: >> >>> What is the value of spark master conf.. By default it is local, that >>> means only one thread can run and that is why your job is stuck. >>> Specify it local[*], to make thread pool equal to number of cores... >>> >>> Raghav >>> On Sep 11, 2015 6:06 AM, "Atul Kulkarni" <atulskulka...@gmail.com> >>> wrote: >>> >>>> Hi Folks, >>>> >>>> Below is the code have for Spark based Kafka Producer to take >>>> advantage of multiple executors reading files in parallel on my cluster but >>>> I am stuck at The program not making any progress. >>>> >>>> Below is my scrubbed code: >>>> >>>> val sparkConf = new SparkConf().setAppName(applicationName) >>>> val ssc = new StreamingContext(sparkConf, Seconds(2)) >>>> >>>> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties)) >>>> >>>> val zipFileDStreams = ssc.textFileStream(inputFiles) >>>> zipFileDStreams.foreachRDD { >>>> rdd => >>>> rdd.foreachPartition( >>>> partition => { >>>> partition.foreach{ >>>> case (logLineText) => >>>> println(logLineText) >>>> producerObj.value.send(topics, logLineText) >>>> } >>>> } >>>> ) >>>> } >>>> >>>> ssc.start() >>>> ssc.awaitTermination() >>>> >>>> ssc.stop() >>>> >>>> The code for KafkaSink is as follows. >>>> >>>> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], >>>> Array[Byte]]) extends Serializable { >>>> >>>> lazy val producer = createProducer() >>>> val logParser = new LogParser() >>>> >>>> def send(topic: String, value: String): Unit = { >>>> >>>> val logLineBytes = >>>> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString) >>>> producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, >>>> logLineBytes)) >>>> } >>>> } >>>> >>>> object KafkaSink { >>>> def apply(config: Properties): KafkaSink = { >>>> >>>> val f = () => { >>>> val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, >>>> null, null) >>>> >>>> sys.addShutdownHook { >>>> producer.close() >>>> } >>>> producer >>>> } >>>> >>>> new KafkaSink(f) >>>> } >>>> } >>>> >>>> Disclaimer: it is based on the code inspired by >>>> http://allegro.tech/spark-kafka-integration.html. >>>> >>>> The job just sits there I cannot see any Job Stages being created. >>>> Something I want to mention - I I am trying to read gzipped files from HDFS >>>> - could it be that Streaming context is not able to read *.gz files? >>>> >>>> >>>> I am not sure what more details I can provide to help explain my >>>> problem. >>>> >>>> >>>> -- >>>> Regards, >>>> Atul Kulkarni >>>> >>> >> >> >> -- >> Regards, >> Atul Kulkarni >> > > -- Regards, Atul Kulkarni