Re: Question about MEOMORY_AND_DISK persistence
Hi Vishnu, A partition will either be in memory or in disk. -Ashwin On Feb 28, 2016 15:09, "Vishnu Viswanath" wrote: > Hi All, > > I have a question regarding Persistence (MEMORY_AND_DISK) > > Suppose I am trying to persist an RDD which has 2 partitions and only 1 > partition can be fit in memory completely but some part of partition 2 can > also be fit, will spark keep the portion of partition 2 in memory and rest > in disk, or will the whole 2nd partition be kept in disk. > > Regards, > Vishnu >
Re: Has anybody ever tried running Spark Streaming on 500 text streams?
Thanks a lot @Das @Cody. I moved from receiver based to direct stream and I can get the topics from the offset!! On Fri, Jul 31, 2015 at 4:41 PM, Brandon White wrote: > Tathagata, > > Could the bottleneck possibility be the number of executor nodes in our > cluster? Since we are creating 500 Dstreams based off 500 textfile > directories, do we need at least 500 executors / nodes to be receivers for > each one of the streams? > > On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das > wrote: > >> @Ashwin: You could append the topic in the data. >> >> val kafkaStreams = topics.map { topic => >> KafkaUtils.createDirectStream(topic...).map { x => (x, topic) } >> } >> val unionedStream = context.union(kafkaStreams) >> >> >> @Brandon: >> I dont recommend it, but you could do something crazy like use the >> foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD >> waits for all the jobs to complete. >> >> manyDStreams.foreach { dstream => >>dstream1.foreachRDD { rdd => >> // Add runnable that runs the job on RDD to threadpool >> // This does not wait for the job to finish >> } >> } >> >> anyOfTheManyDStreams.foreachRDD { _ => >> // wait for all the current batch's jobs in the threadpool to >> complete. >> >> } >> >> >> This would run all the Spark jobs in the batch in parallel in thread >> pool, but it would also make sure all the jobs finish before the batch is >> marked as completed. >> >> On Tue, Jul 28, 2015 at 4:05 PM, Brandon White >> wrote: >> >>> Thank you Tathagata. My main use case for the 500 streams is to append >>> new elements into their corresponding Spark SQL tables. Every stream is >>> mapped to a table so I'd like to use the streams to appended the new rdds >>> to the table. If I union all the streams, appending new elements becomes a >>> nightmare. So there is no other way to parallelize something like the >>> following? Will this still run sequence or timeout? >>> >>> //500 streams >>> streams.foreach { stream => >>> stream.foreachRDD { rdd => >>> val df = sqlContext.jsonRDD(rdd) >>> df.saveAsTable(streamTuple._1, SaveMode.Append) >>> >>> } >>> } >>> >>> On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das >>> wrote: >>> >>>> I dont think any one has really run 500 text streams. >>>> And parSequences do nothing out there, you are only parallelizing the >>>> setup code which does not really compute anything. Also it setsup 500 >>>> foreachRDD operations that will get executed in each batch sequentially, so >>>> does not make sense. The write way to parallelize this is union all the >>>> streams. >>>> >>>> val streams = streamPaths.map { path => >>>> ssc.textFileStream(path) >>>> } >>>> val unionedStream = streamingContext.union(streams) >>>> unionedStream.foreachRDD { rdd => >>>> // do something >>>> } >>>> >>>> Then there is only one foreachRDD executed in every batch that will >>>> process in parallel all the new files in each batch interval. >>>> TD >>>> >>>> >>>> On Tue, Jul 28, 2015 at 3:06 PM, Brandon White >>> > wrote: >>>> >>>>> val ssc = new StreamingContext(sc, Minutes(10)) >>>>> >>>>> //500 textFile streams watching S3 directories >>>>> val streams = streamPaths.par.map { path => >>>>> ssc.textFileStream(path) >>>>> } >>>>> >>>>> streams.par.foreach { stream => >>>>> stream.foreachRDD { rdd => >>>>> //do something >>>>> } >>>>> } >>>>> >>>>> ssc.start() >>>>> >>>>> Would something like this scale? What would be the limiting factor to >>>>> performance? What is the best way to parallelize this? Any other ideas on >>>>> design? >>>>> >>>> >>>> >>> >> > -- Thanks & Regards, Ashwin Giridharan
Re: What happens when you create more DStreams then nodes in the cluster?
@Brandon Each node can host multiple executors. For example, In a 15 node cluster, if your NodeManager ( In YARN) or equivalent ( MESOS or Standalone), runs on each of this node and if the node has enough resources to host say 5 executors, then in total you can have 15*5 executors and each of this executor can have a DStream Receiver. But be aware that each of the DStream receiver uses a dedicated core. "the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it" Thanks, Ashwin On Fri, Jul 31, 2015 at 4:52 PM, Brandon White wrote: > Since one input dstream creates one receiver and one receiver uses one > executor / node. > > What happens if you create more Dstreams than nodes in the cluster? > > Say I have 30 Dstreams on a 15 node cluster. > > Will ~10 streams get assigned to ~10 executors / nodes then the other ~20 > streams will be queued for resources or will the other streams just fail > and never run? > -- Thanks & Regards, Ashwin Giridharan
Re: How to control Spark Executors from getting Lost when using YARN client mode?
What is your cluster configuration ( size and resources) ? If you do not have enough resources, then your executor will not run. Moreover allocating 8 cores to an executor is too much. If you have a cluster with four nodes running NodeManagers, each equipped with 4 cores and 8GB of memory, then an optimal configuration would be, --num-executors 8 --executor-cores 2 --executor-memory 2G Thanks, Ashwin On Thu, Jul 30, 2015 at 12:08 PM, unk1102 wrote: > Hi I have one Spark job which runs fine locally with less data but when I > schedule it on YARN to execute I keep on getting the following ERROR and > slowly all executors gets removed from UI and my job fails > > 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on > myhost1.com: remote Rpc client disassociated > 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on > myhost2.com: remote Rpc client disassociated > I use the following command to schedule spark job in yarn-client mode > > ./spark-submit --class com.xyz.MySpark --conf > "spark.executor.extraJavaOptions=-XX:MaxPermSize=512M" > --driver-java-options > -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client > --executor-memory 2G --executor-cores 8 --num-executors 12 > /home/myuser/myspark-1.0.jar > > I dont know what is the problem please guide. I am new to Spark. Thanks in > advance. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Thanks & Regards, Ashwin Giridharan
Re: Has anybody ever tried running Spark Streaming on 500 text streams?
@Das, Is there anyway to identify a kafka topic when we have unified stream? As of now, for each topic I create dedicated DStream and use foreachRDD on each of these Streams. If I have say 100 kafka topics, then how can I use unified stream and still take topic specific actions inside foreachRDD ? On Tue, Jul 28, 2015 at 6:42 PM, Tathagata Das wrote: > I dont think any one has really run 500 text streams. > And parSequences do nothing out there, you are only parallelizing the > setup code which does not really compute anything. Also it setsup 500 > foreachRDD operations that will get executed in each batch sequentially, so > does not make sense. The write way to parallelize this is union all the > streams. > > val streams = streamPaths.map { path => > ssc.textFileStream(path) > } > val unionedStream = streamingContext.union(streams) > unionedStream.foreachRDD { rdd => > // do something > } > > Then there is only one foreachRDD executed in every batch that will > process in parallel all the new files in each batch interval. > TD > > > On Tue, Jul 28, 2015 at 3:06 PM, Brandon White > wrote: > >> val ssc = new StreamingContext(sc, Minutes(10)) >> >> //500 textFile streams watching S3 directories >> val streams = streamPaths.par.map { path => >> ssc.textFileStream(path) >> } >> >> streams.par.foreach { stream => >> stream.foreachRDD { rdd => >> //do something >> } >> } >> >> ssc.start() >> >> Would something like this scale? What would be the limiting factor to >> performance? What is the best way to parallelize this? Any other ideas on >> design? >> > > -- Thanks & Regards, Ashwin Giridharan
Re: Long running streaming application - worker death
Hi Aviemzur, As of now the Spark workers just run indefinitely in a loop irrespective of whether the data source (kafka) is active or lost its connection, due to the fact that it just reads the zookeeper for the offset of the data to be consumed. So when your DStream receiver is lost, its LOST! As mentioned here ( http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/) , "One crude workaround is to restart your streaming application whenever it runs into an upstream data source failure or a receiver failure. This workaround may not help you though if your use case requires you to set the Kafka configuration option auto.offset.reset to “smallest” – because of a known bug in Spark Streaming the resulting behavior of your streaming application may not be what you want." The jira "https://spark-project.atlassian.net/browse/SPARK-1340"; corresponding to this bug is yet to be resolved. Also have a look at http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-td3347.html Thanks, Ashwin On Sun, Jul 26, 2015 at 9:29 AM, aviemzur wrote: > Hi all, > > I have a question about long running streaming applications and workers > that > act as consumers. > > Specifically my program runs on a spark standalone cluster with a small > number of workers, acting as kafka consumers using spark streaming. > > What I noticed was that in a long running application, if one of the > workers > dies for some reason and then a new worker registers to replace it, we have > effectively lost that worker as a consumer. > > When the driver first runs, I create a configured amount of > KafkaInputDStream instances, in my case, the same number as the number of > workers in the cluster, and spark distributes these among the workers, so > each one of my workers consumes from Kafka. > > I then unify the streams to a single stream using SparkStreamingContext > union. > > This code never runs again though, and there is no code that monitors that > we have X number of consumers at all time. > > So when a worker dies, we effectively lose a consumer, and never create a > new one, then the lag in Kafka starts growing. > > Does anybody have a solution / ideas regarding this issue? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Long-running-streaming-application-worker-death-tp23997.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Thanks & Regards, Ashwin Giridharan