Re: Question about MEOMORY_AND_DISK persistence

2016-02-28 Thread Ashwin Giridharan
Hi Vishnu,

A partition will either be in memory or in disk.

-Ashwin
On Feb 28, 2016 15:09, "Vishnu Viswanath" 
wrote:

> Hi All,
>
> I have a question regarding Persistence (MEMORY_AND_DISK)
>
> Suppose I am trying to persist an RDD which has 2 partitions and only 1
> partition can be fit in memory completely but some part of partition 2 can
> also be fit, will spark keep the portion of partition 2 in memory and rest
> in disk, or will the whole 2nd partition be kept in disk.
>
> Regards,
> Vishnu
>


Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-31 Thread Ashwin Giridharan
Thanks a lot @Das @Cody. I moved from receiver based to direct stream and I
can get the topics from the offset!!

On Fri, Jul 31, 2015 at 4:41 PM, Brandon White 
wrote:

> Tathagata,
>
> Could the bottleneck possibility be the number of executor nodes in our
> cluster? Since we are creating 500 Dstreams based off 500 textfile
> directories, do we need at least 500 executors / nodes to be receivers for
> each one of the streams?
>
> On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das 
> wrote:
>
>> @Ashwin: You could append the topic in the data.
>>
>> val kafkaStreams = topics.map { topic =>
>> KafkaUtils.createDirectStream(topic...).map { x => (x, topic) }
>> }
>> val unionedStream = context.union(kafkaStreams)
>>
>>
>> @Brandon:
>> I dont recommend it, but you could do something crazy like use the
>> foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
>> waits for all the jobs to complete.
>>
>> manyDStreams.foreach { dstream =>
>>dstream1.foreachRDD { rdd =>
>> // Add runnable that runs the job on RDD to threadpool
>> // This does not wait for the job to finish
>>   }
>> }
>>
>> anyOfTheManyDStreams.foreachRDD { _ =>
>> // wait for all the current batch's jobs in the threadpool to
>> complete.
>>
>> }
>>
>>
>> This would run all the Spark jobs in the batch in parallel in thread
>> pool, but it would also make sure all the jobs finish before the batch is
>> marked as completed.
>>
>> On Tue, Jul 28, 2015 at 4:05 PM, Brandon White 
>> wrote:
>>
>>> Thank you Tathagata. My main use case for the 500 streams is to append
>>> new elements into their corresponding Spark SQL tables. Every stream is
>>> mapped to a table so I'd like to use the streams to appended the new rdds
>>> to the table. If I union all the streams, appending new elements becomes a
>>> nightmare. So there is no other way to parallelize something like the
>>> following? Will this still run sequence or timeout?
>>>
>>> //500 streams
>>> streams.foreach { stream =>
>>>   stream.foreachRDD { rdd =>
>>> val df = sqlContext.jsonRDD(rdd)
>>> df.saveAsTable(streamTuple._1, SaveMode.Append)
>>>
>>>   }
>>> }
>>>
>>> On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das 
>>> wrote:
>>>
>>>> I dont think any one has really run 500 text streams.
>>>> And parSequences do nothing out there, you are only parallelizing the
>>>> setup code which does not really compute anything. Also it setsup 500
>>>> foreachRDD operations that will get executed in each batch sequentially, so
>>>> does not make sense. The write way to parallelize this is union all the
>>>> streams.
>>>>
>>>> val streams = streamPaths.map { path =>
>>>>   ssc.textFileStream(path)
>>>> }
>>>> val unionedStream = streamingContext.union(streams)
>>>> unionedStream.foreachRDD { rdd =>
>>>>   // do something
>>>> }
>>>>
>>>> Then there is only one foreachRDD executed in every batch that will
>>>> process in parallel all the new files in each batch interval.
>>>> TD
>>>>
>>>>
>>>> On Tue, Jul 28, 2015 at 3:06 PM, Brandon White >>> > wrote:
>>>>
>>>>> val ssc = new StreamingContext(sc, Minutes(10))
>>>>>
>>>>> //500 textFile streams watching S3 directories
>>>>> val streams = streamPaths.par.map { path =>
>>>>>   ssc.textFileStream(path)
>>>>> }
>>>>>
>>>>> streams.par.foreach { stream =>
>>>>>   stream.foreachRDD { rdd =>
>>>>> //do something
>>>>>   }
>>>>> }
>>>>>
>>>>> ssc.start()
>>>>>
>>>>> Would something like this scale? What would be the limiting factor to
>>>>> performance? What is the best way to parallelize this? Any other ideas on
>>>>> design?
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
Thanks & Regards,
Ashwin Giridharan


Re: What happens when you create more DStreams then nodes in the cluster?

2015-07-31 Thread Ashwin Giridharan
@Brandon Each node can host multiple executors. For example, In a 15 node
cluster, if your NodeManager ( In YARN) or equivalent ( MESOS or
Standalone), runs on each of this node and if the node has enough resources
to host say 5 executors, then in total you can have 15*5 executors and each
of this executor can have a DStream Receiver.

But be aware that each of the DStream receiver uses a dedicated core. "the
number of cores allocated to the Spark Streaming application must be more
than the number of receivers. Otherwise the system will receive data, but
not be able to process it"

Thanks,
Ashwin

On Fri, Jul 31, 2015 at 4:52 PM, Brandon White 
wrote:

> Since one input dstream creates one receiver and one receiver uses one
> executor / node.
>
> What happens if you create more Dstreams than nodes in the cluster?
>
> Say I have 30 Dstreams on a 15 node cluster.
>
> Will ~10 streams get assigned to ~10 executors / nodes then the other ~20
> streams will be queued for resources or will the other streams just fail
> and never run?
>



-- 
Thanks & Regards,
Ashwin Giridharan


Re: How to control Spark Executors from getting Lost when using YARN client mode?

2015-07-30 Thread Ashwin Giridharan
What is your cluster configuration ( size and resources) ?

If you do not have enough resources, then your executor will not run.
Moreover allocating 8 cores to an executor is too much.

If you have a cluster with four nodes running NodeManagers, each equipped
with 4 cores and 8GB of memory,
then an optimal configuration would be,

--num-executors 8 --executor-cores 2 --executor-memory 2G

Thanks,
Ashwin

On Thu, Jul 30, 2015 at 12:08 PM, unk1102  wrote:

> Hi I have one Spark job which runs fine locally with less data but when I
> schedule it on YARN to execute I keep on getting the following ERROR and
> slowly all executors gets removed from UI and my job fails
>
> 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on
> myhost1.com: remote Rpc client disassociated
> 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on
> myhost2.com: remote Rpc client disassociated
> I use the following command to schedule spark job in yarn-client mode
>
>  ./spark-submit --class com.xyz.MySpark --conf
> "spark.executor.extraJavaOptions=-XX:MaxPermSize=512M"
> --driver-java-options
> -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client
> --executor-memory 2G --executor-cores 8 --num-executors 12
> /home/myuser/myspark-1.0.jar
>
> I dont know what is the problem please guide. I am new to Spark. Thanks in
> advance.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Thanks & Regards,
Ashwin Giridharan


Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-28 Thread Ashwin Giridharan
@Das, Is there anyway to identify a kafka topic when we have unified
stream? As of now, for each topic I create dedicated DStream and use
foreachRDD on each of these Streams. If I have say 100 kafka topics, then
how can I use unified stream and still take topic specific actions inside
foreachRDD ?

On Tue, Jul 28, 2015 at 6:42 PM, Tathagata Das  wrote:

> I dont think any one has really run 500 text streams.
> And parSequences do nothing out there, you are only parallelizing the
> setup code which does not really compute anything. Also it setsup 500
> foreachRDD operations that will get executed in each batch sequentially, so
> does not make sense. The write way to parallelize this is union all the
> streams.
>
> val streams = streamPaths.map { path =>
>   ssc.textFileStream(path)
> }
> val unionedStream = streamingContext.union(streams)
> unionedStream.foreachRDD { rdd =>
>   // do something
> }
>
> Then there is only one foreachRDD executed in every batch that will
> process in parallel all the new files in each batch interval.
> TD
>
>
> On Tue, Jul 28, 2015 at 3:06 PM, Brandon White 
> wrote:
>
>> val ssc = new StreamingContext(sc, Minutes(10))
>>
>> //500 textFile streams watching S3 directories
>> val streams = streamPaths.par.map { path =>
>>   ssc.textFileStream(path)
>> }
>>
>> streams.par.foreach { stream =>
>>   stream.foreachRDD { rdd =>
>> //do something
>>   }
>> }
>>
>> ssc.start()
>>
>> Would something like this scale? What would be the limiting factor to
>> performance? What is the best way to parallelize this? Any other ideas on
>> design?
>>
>
>


-- 
Thanks & Regards,
Ashwin Giridharan


Re: Long running streaming application - worker death

2015-07-26 Thread Ashwin Giridharan
Hi Aviemzur,

As of now the Spark workers just run indefinitely in a loop irrespective of
whether the data source (kafka) is active or lost its connection, due to
the fact that it just reads the zookeeper for the offset of the data to be
consumed. So when your DStream receiver is lost, its LOST!

As mentioned here (
http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/)
,

"One crude workaround is to restart your streaming application whenever it
runs into an upstream data source failure or a receiver failure. This
workaround may not help you though if your use case requires you to set the
Kafka configuration option auto.offset.reset to “smallest” – because of a
known bug in Spark Streaming the resulting behavior of your streaming
application may not be what you want."

The jira "https://spark-project.atlassian.net/browse/SPARK-1340";
corresponding to this bug is yet to be resolved.

Also have a look at
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-td3347.html

Thanks,
Ashwin


On Sun, Jul 26, 2015 at 9:29 AM, aviemzur  wrote:

> Hi all,
>
> I have a question about long running streaming applications and workers
> that
> act as consumers.
>
> Specifically my program runs on a spark standalone cluster with a small
> number of workers, acting as kafka consumers using spark streaming.
>
> What I noticed was that in a long running application, if one of the
> workers
> dies for some reason and then a new worker registers to replace it, we have
> effectively lost that worker as a consumer.
>
> When the driver first runs, I create a configured amount of
> KafkaInputDStream instances, in my case, the same number as the number of
> workers in the cluster, and spark distributes these among the workers, so
> each one of my workers consumes from Kafka.
>
> I then unify the streams to a single stream using SparkStreamingContext
> union.
>
> This code never runs again though, and there is no code that monitors that
> we have X number of consumers at all time.
>
> So when a worker dies, we effectively lose a consumer, and never create a
> new one, then the lag in Kafka starts growing.
>
> Does anybody have a solution / ideas regarding this issue?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Long-running-streaming-application-worker-death-tp23997.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Thanks & Regards,
Ashwin Giridharan