You can pass the number of executors via command line option
--num-executors.You need more than 2 executors to make spark-streaming
working.

For more details on command line option, please go through
http://spark.apache.org/docs/latest/running-on-yarn.html.


On Fri, Sep 11, 2015 at 10:52 AM, Atul Kulkarni <atulskulka...@gmail.com>
wrote:

> I am submitting the job with yarn-cluster mode.
>
> spark-submit --master yarn-cluster ...
>
> On Thu, Sep 10, 2015 at 7:50 PM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> What is the value of spark master conf.. By default it is local, that
>> means only one thread can run and that is why your job is stuck.
>> Specify it local[*], to make thread pool equal to number of cores...
>>
>> Raghav
>> On Sep 11, 2015 6:06 AM, "Atul Kulkarni" <atulskulka...@gmail.com> wrote:
>>
>>> Hi Folks,
>>>
>>> Below is the code  have for Spark based Kafka Producer to take advantage
>>> of multiple executors reading files in parallel on my cluster but I am
>>> stuck at The program not making any progress.
>>>
>>> Below is my scrubbed code:
>>>
>>> val sparkConf = new SparkConf().setAppName(applicationName)
>>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>>>
>>> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))
>>>
>>> val zipFileDStreams = ssc.textFileStream(inputFiles)
>>> zipFileDStreams.foreachRDD {
>>>   rdd =>
>>>     rdd.foreachPartition(
>>>       partition => {
>>>         partition.foreach{
>>>           case (logLineText) =>
>>>             println(logLineText)
>>>             producerObj.value.send(topics, logLineText)
>>>         }
>>>       }
>>>     )
>>> }
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>> ssc.stop()
>>>
>>> The code for KafkaSink is as follows.
>>>
>>> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], 
>>> Array[Byte]]) extends Serializable {
>>>
>>>   lazy val producer = createProducer()
>>>   val logParser = new LogParser()
>>>
>>>   def send(topic: String, value: String): Unit = {
>>>
>>>     val logLineBytes = 
>>> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
>>>     producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
>>> logLineBytes))
>>>   }
>>> }
>>>
>>> object KafkaSink {
>>>   def apply(config: Properties): KafkaSink = {
>>>
>>>     val f = () => {
>>>       val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, 
>>> null, null)
>>>
>>>       sys.addShutdownHook {
>>>         producer.close()
>>>       }
>>>       producer
>>>     }
>>>
>>>     new KafkaSink(f)
>>>   }
>>> }
>>>
>>> Disclaimer: it is based on the code inspired by
>>> http://allegro.tech/spark-kafka-integration.html.
>>>
>>> The job just sits there I cannot see any Job Stages being created.
>>> Something I want to mention - I I am trying to read gzipped files from HDFS
>>> - could it be that Streaming context is not able to read *.gz files?
>>>
>>>
>>> I am not sure what more details I can provide to help explain my problem.
>>>
>>>
>>> --
>>> Regards,
>>> Atul Kulkarni
>>>
>>
>
>
> --
> Regards,
> Atul Kulkarni
>

Reply via email to