I am submitting the job with yarn-cluster mode. spark-submit --master yarn-cluster ...
On Thu, Sep 10, 2015 at 7:50 PM, Raghavendra Pandey < raghavendra.pan...@gmail.com> wrote: > What is the value of spark master conf.. By default it is local, that > means only one thread can run and that is why your job is stuck. > Specify it local[*], to make thread pool equal to number of cores... > > Raghav > On Sep 11, 2015 6:06 AM, "Atul Kulkarni" <atulskulka...@gmail.com> wrote: > >> Hi Folks, >> >> Below is the code have for Spark based Kafka Producer to take advantage >> of multiple executors reading files in parallel on my cluster but I am >> stuck at The program not making any progress. >> >> Below is my scrubbed code: >> >> val sparkConf = new SparkConf().setAppName(applicationName) >> val ssc = new StreamingContext(sparkConf, Seconds(2)) >> >> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties)) >> >> val zipFileDStreams = ssc.textFileStream(inputFiles) >> zipFileDStreams.foreachRDD { >> rdd => >> rdd.foreachPartition( >> partition => { >> partition.foreach{ >> case (logLineText) => >> println(logLineText) >> producerObj.value.send(topics, logLineText) >> } >> } >> ) >> } >> >> ssc.start() >> ssc.awaitTermination() >> >> ssc.stop() >> >> The code for KafkaSink is as follows. >> >> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], >> Array[Byte]]) extends Serializable { >> >> lazy val producer = createProducer() >> val logParser = new LogParser() >> >> def send(topic: String, value: String): Unit = { >> >> val logLineBytes = >> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString) >> producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, >> logLineBytes)) >> } >> } >> >> object KafkaSink { >> def apply(config: Properties): KafkaSink = { >> >> val f = () => { >> val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, >> null, null) >> >> sys.addShutdownHook { >> producer.close() >> } >> producer >> } >> >> new KafkaSink(f) >> } >> } >> >> Disclaimer: it is based on the code inspired by >> http://allegro.tech/spark-kafka-integration.html. >> >> The job just sits there I cannot see any Job Stages being created. >> Something I want to mention - I I am trying to read gzipped files from HDFS >> - could it be that Streaming context is not able to read *.gz files? >> >> >> I am not sure what more details I can provide to help explain my problem. >> >> >> -- >> Regards, >> Atul Kulkarni >> > -- Regards, Atul Kulkarni