You can pass the number of executors via command line option --num-executors.You need more than 2 executors to make spark-streaming working.
For more details on command line option, please go through http://spark.apache.org/docs/latest/running-on-yarn.html. On Fri, Sep 11, 2015 at 10:52 AM, Atul Kulkarni <atulskulka...@gmail.com> wrote: > I am submitting the job with yarn-cluster mode. > > spark-submit --master yarn-cluster ... > > On Thu, Sep 10, 2015 at 7:50 PM, Raghavendra Pandey < > raghavendra.pan...@gmail.com> wrote: > >> What is the value of spark master conf.. By default it is local, that >> means only one thread can run and that is why your job is stuck. >> Specify it local[*], to make thread pool equal to number of cores... >> >> Raghav >> On Sep 11, 2015 6:06 AM, "Atul Kulkarni" <atulskulka...@gmail.com> wrote: >> >>> Hi Folks, >>> >>> Below is the code have for Spark based Kafka Producer to take advantage >>> of multiple executors reading files in parallel on my cluster but I am >>> stuck at The program not making any progress. >>> >>> Below is my scrubbed code: >>> >>> val sparkConf = new SparkConf().setAppName(applicationName) >>> val ssc = new StreamingContext(sparkConf, Seconds(2)) >>> >>> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties)) >>> >>> val zipFileDStreams = ssc.textFileStream(inputFiles) >>> zipFileDStreams.foreachRDD { >>> rdd => >>> rdd.foreachPartition( >>> partition => { >>> partition.foreach{ >>> case (logLineText) => >>> println(logLineText) >>> producerObj.value.send(topics, logLineText) >>> } >>> } >>> ) >>> } >>> >>> ssc.start() >>> ssc.awaitTermination() >>> >>> ssc.stop() >>> >>> The code for KafkaSink is as follows. >>> >>> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], >>> Array[Byte]]) extends Serializable { >>> >>> lazy val producer = createProducer() >>> val logParser = new LogParser() >>> >>> def send(topic: String, value: String): Unit = { >>> >>> val logLineBytes = >>> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString) >>> producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, >>> logLineBytes)) >>> } >>> } >>> >>> object KafkaSink { >>> def apply(config: Properties): KafkaSink = { >>> >>> val f = () => { >>> val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, >>> null, null) >>> >>> sys.addShutdownHook { >>> producer.close() >>> } >>> producer >>> } >>> >>> new KafkaSink(f) >>> } >>> } >>> >>> Disclaimer: it is based on the code inspired by >>> http://allegro.tech/spark-kafka-integration.html. >>> >>> The job just sits there I cannot see any Job Stages being created. >>> Something I want to mention - I I am trying to read gzipped files from HDFS >>> - could it be that Streaming context is not able to read *.gz files? >>> >>> >>> I am not sure what more details I can provide to help explain my problem. >>> >>> >>> -- >>> Regards, >>> Atul Kulkarni >>> >> > > > -- > Regards, > Atul Kulkarni >