Hi Raghavendra,

Thanks for your answers, I am passing 10 executors and I am not sure if
that is the problem. It is still hung.

Regards,
Atul.


On Fri, Sep 11, 2015 at 12:40 AM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> You can pass the number of executors via command line option
> --num-executors.You need more than 2 executors to make spark-streaming
> working.
>
> For more details on command line option, please go through
> http://spark.apache.org/docs/latest/running-on-yarn.html.
>
>
> On Fri, Sep 11, 2015 at 10:52 AM, Atul Kulkarni <atulskulka...@gmail.com>
> wrote:
>
>> I am submitting the job with yarn-cluster mode.
>>
>> spark-submit --master yarn-cluster ...
>>
>> On Thu, Sep 10, 2015 at 7:50 PM, Raghavendra Pandey <
>> raghavendra.pan...@gmail.com> wrote:
>>
>>> What is the value of spark master conf.. By default it is local, that
>>> means only one thread can run and that is why your job is stuck.
>>> Specify it local[*], to make thread pool equal to number of cores...
>>>
>>> Raghav
>>> On Sep 11, 2015 6:06 AM, "Atul Kulkarni" <atulskulka...@gmail.com>
>>> wrote:
>>>
>>>> Hi Folks,
>>>>
>>>> Below is the code  have for Spark based Kafka Producer to take
>>>> advantage of multiple executors reading files in parallel on my cluster but
>>>> I am stuck at The program not making any progress.
>>>>
>>>> Below is my scrubbed code:
>>>>
>>>> val sparkConf = new SparkConf().setAppName(applicationName)
>>>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>>>>
>>>> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))
>>>>
>>>> val zipFileDStreams = ssc.textFileStream(inputFiles)
>>>> zipFileDStreams.foreachRDD {
>>>>   rdd =>
>>>>     rdd.foreachPartition(
>>>>       partition => {
>>>>         partition.foreach{
>>>>           case (logLineText) =>
>>>>             println(logLineText)
>>>>             producerObj.value.send(topics, logLineText)
>>>>         }
>>>>       }
>>>>     )
>>>> }
>>>>
>>>> ssc.start()
>>>> ssc.awaitTermination()
>>>>
>>>> ssc.stop()
>>>>
>>>> The code for KafkaSink is as follows.
>>>>
>>>> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], 
>>>> Array[Byte]]) extends Serializable {
>>>>
>>>>   lazy val producer = createProducer()
>>>>   val logParser = new LogParser()
>>>>
>>>>   def send(topic: String, value: String): Unit = {
>>>>
>>>>     val logLineBytes = 
>>>> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
>>>>     producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
>>>> logLineBytes))
>>>>   }
>>>> }
>>>>
>>>> object KafkaSink {
>>>>   def apply(config: Properties): KafkaSink = {
>>>>
>>>>     val f = () => {
>>>>       val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, 
>>>> null, null)
>>>>
>>>>       sys.addShutdownHook {
>>>>         producer.close()
>>>>       }
>>>>       producer
>>>>     }
>>>>
>>>>     new KafkaSink(f)
>>>>   }
>>>> }
>>>>
>>>> Disclaimer: it is based on the code inspired by
>>>> http://allegro.tech/spark-kafka-integration.html.
>>>>
>>>> The job just sits there I cannot see any Job Stages being created.
>>>> Something I want to mention - I I am trying to read gzipped files from HDFS
>>>> - could it be that Streaming context is not able to read *.gz files?
>>>>
>>>>
>>>> I am not sure what more details I can provide to help explain my
>>>> problem.
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Atul Kulkarni
>>>>
>>>
>>
>>
>> --
>> Regards,
>> Atul Kulkarni
>>
>
>


-- 
Regards,
Atul Kulkarni

Reply via email to