I am submitting the job with yarn-cluster mode.

spark-submit --master yarn-cluster ...

On Thu, Sep 10, 2015 at 7:50 PM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> What is the value of spark master conf.. By default it is local, that
> means only one thread can run and that is why your job is stuck.
> Specify it local[*], to make thread pool equal to number of cores...
>
> Raghav
> On Sep 11, 2015 6:06 AM, "Atul Kulkarni" <atulskulka...@gmail.com> wrote:
>
>> Hi Folks,
>>
>> Below is the code  have for Spark based Kafka Producer to take advantage
>> of multiple executors reading files in parallel on my cluster but I am
>> stuck at The program not making any progress.
>>
>> Below is my scrubbed code:
>>
>> val sparkConf = new SparkConf().setAppName(applicationName)
>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>>
>> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))
>>
>> val zipFileDStreams = ssc.textFileStream(inputFiles)
>> zipFileDStreams.foreachRDD {
>>   rdd =>
>>     rdd.foreachPartition(
>>       partition => {
>>         partition.foreach{
>>           case (logLineText) =>
>>             println(logLineText)
>>             producerObj.value.send(topics, logLineText)
>>         }
>>       }
>>     )
>> }
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>> ssc.stop()
>>
>> The code for KafkaSink is as follows.
>>
>> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], 
>> Array[Byte]]) extends Serializable {
>>
>>   lazy val producer = createProducer()
>>   val logParser = new LogParser()
>>
>>   def send(topic: String, value: String): Unit = {
>>
>>     val logLineBytes = 
>> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
>>     producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
>> logLineBytes))
>>   }
>> }
>>
>> object KafkaSink {
>>   def apply(config: Properties): KafkaSink = {
>>
>>     val f = () => {
>>       val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, 
>> null, null)
>>
>>       sys.addShutdownHook {
>>         producer.close()
>>       }
>>       producer
>>     }
>>
>>     new KafkaSink(f)
>>   }
>> }
>>
>> Disclaimer: it is based on the code inspired by
>> http://allegro.tech/spark-kafka-integration.html.
>>
>> The job just sits there I cannot see any Job Stages being created.
>> Something I want to mention - I I am trying to read gzipped files from HDFS
>> - could it be that Streaming context is not able to read *.gz files?
>>
>>
>> I am not sure what more details I can provide to help explain my problem.
>>
>>
>> --
>> Regards,
>> Atul Kulkarni
>>
>


-- 
Regards,
Atul Kulkarni

Reply via email to