Re: How can i remove the need for calling cache
thanks Vadim. yes this is a good option for us. thanks From: Vadim Semenov Sent: Wednesday, August 2, 2017 6:24:40 PM To: Suzen, Mehmet Cc: jeff saremi; user@spark.apache.org Subject: Re: How can i remove the need for calling cache So if you just save an RDD to HDFS via 'saveAsSequenceFile', you would have to create a new RDD that reads that data, this way you'll avoid recomputing the RDD but may lose time on saving/loading. Exactly same thing happens in 'checkpoint', 'checkpoint' is just a convenient method that gives you the same RDD back, basically. However, if your job fails, there's no way to run a new job using already 'checkpointed' data from a previous failed run. That's where having a custom check pointer helps. Another note: you can not delete "checkpoint"ed data in the same job, you need to delete it somehow else. BTW, have you tried '.persist(StorageLevel.DISK_ONLY)'? It caches data to local disk, making more space in JVM and letting you to avoid hdfs. On Wednesday, August 2, 2017, Vadim Semenov mailto:vadim.seme...@datadoghq.com>> wrote: `saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it just saves data to some destination. `cache/persist` allow you to cache data and keep the DAG in case of some executor that holds data goes down, so Spark would still be able to recalculate missing partitions `localCheckpoint` allows you to sacrifice fault-tolerance and truncate the DAG, so if some executor goes down, the job will fail, because it has already forgotten the DAG. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1551-L1610 and `checkpoint` allows you to save data to some shared storage and truncate the DAG, so if an executor goes down, the job will be able to take missing partitions from the place where it saved the RDD https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549 On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet > wrote: On 3 August 2017 at 01:05, jeff saremi > wrote: > Vadim: > > This is from the Mastering Spark book: > > "It is strongly recommended that a checkpointed RDD is persisted in memory, > otherwise saving it on a file will require recomputation." Is this really true? I had the impression that DAG will not be carried out once RDD is serialized to an external file, so 'saveAsObjectFile' saves DAG as well?
Re: How can i remove the need for calling cache
So if you just save an RDD to HDFS via 'saveAsSequenceFile', you would have to create a new RDD that reads that data, this way you'll avoid recomputing the RDD but may lose time on saving/loading. Exactly same thing happens in 'checkpoint', 'checkpoint' is just a convenient method that gives you the same RDD back, basically. However, if your job fails, there's no way to run a new job using already 'checkpointed' data from a previous failed run. That's where having a custom check pointer helps. Another note: you can not delete "checkpoint"ed data in the same job, you need to delete it somehow else. BTW, have you tried '.persist(StorageLevel.DISK_ONLY)'? It caches data to local disk, making more space in JVM and letting you to avoid hdfs. On Wednesday, August 2, 2017, Vadim Semenov wrote: > `saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so > it just saves data to some destination. > > `cache/persist` allow you to cache data and keep the DAG in case of some > executor that holds data goes down, so Spark would still be able to > recalculate missing partitions > > `localCheckpoint` allows you to sacrifice fault-tolerance and truncate the > DAG, so if some executor goes down, the job will fail, because it has > already forgotten the DAG. https://github.com/apache/ > spark/blob/master/core/src/main/scala/org/apache/spark/ > rdd/RDD.scala#L1551-L1610 > > and `checkpoint` allows you to save data to some shared storage and > truncate the DAG, so if an executor goes down, the job will be able to take > missing partitions from the place where it saved the RDD > https://github.com/apache/spark/blob/master/core/src/ > main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549 > > On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet > wrote: > >> On 3 August 2017 at 01:05, jeff saremi > > wrote: >> > Vadim: >> > >> > This is from the Mastering Spark book: >> > >> > "It is strongly recommended that a checkpointed RDD is persisted in >> memory, >> > otherwise saving it on a file will require recomputation." >> >> Is this really true? I had the impression that DAG will not be carried >> out once RDD is serialized to an external file, so 'saveAsObjectFile' >> saves DAG as well? >> > >
Re: How can i remove the need for calling cache
On 3 August 2017 at 03:00, Vadim Semenov wrote: > `saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it > just saves data to some destination. Yes, that's what I thought, so the statement "..otherwise saving it on a file will require recomputation." from the book is not entirely true. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How can i remove the need for calling cache
`saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it just saves data to some destination. `cache/persist` allow you to cache data and keep the DAG in case of some executor that holds data goes down, so Spark would still be able to recalculate missing partitions `localCheckpoint` allows you to sacrifice fault-tolerance and truncate the DAG, so if some executor goes down, the job will fail, because it has already forgotten the DAG. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1551-L1610 and `checkpoint` allows you to save data to some shared storage and truncate the DAG, so if an executor goes down, the job will be able to take missing partitions from the place where it saved the RDD https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549 On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet wrote: > On 3 August 2017 at 01:05, jeff saremi wrote: > > Vadim: > > > > This is from the Mastering Spark book: > > > > "It is strongly recommended that a checkpointed RDD is persisted in > memory, > > otherwise saving it on a file will require recomputation." > > Is this really true? I had the impression that DAG will not be carried > out once RDD is serialized to an external file, so 'saveAsObjectFile' > saves DAG as well? >
Re: How can i remove the need for calling cache
Also check the `RDD.checkpoint()` method https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1550 On Wed, Aug 2, 2017 at 8:46 PM, Vadim Semenov wrote: > I'm not sure that "checkpointed" means the same thing in that sentence. > > You can run a simple test using `spark-shell`: > > sc.setCheckpointDir("/tmp/checkpoint") > val rdd = sc.parallelize(1 to 10).map(x => { > Thread.sleep(1000) > x > }) > rdd.checkpoint() > rdd.foreach(println) // Will take 10 seconds > rdd.foreach(println) // Will be instant, because the RDD is checkpointed > > On Wed, Aug 2, 2017 at 7:05 PM, jeff saremi > wrote: > >> Vadim: >> >> This is from the Mastering Spark book: >> >> *"It is strongly recommended that a checkpointed RDD is persisted in >> memory, otherwise saving it on a file will require recomputation."* >> >> >> To me that means checkpoint will not prevent the recomputation that i was >> hoping for >> ---------- >> *From:* Vadim Semenov >> *Sent:* Tuesday, August 1, 2017 12:05:17 PM >> *To:* jeff saremi >> *Cc:* user@spark.apache.org >> *Subject:* Re: How can i remove the need for calling cache >> >> You can use `.checkpoint()`: >> ``` >> val sc: SparkContext >> sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory") >> myrdd.checkpoint() >> val result1 = myrdd.map(op1(_)) >> result1.count() // Will save `myrdd` to HDFS and do map(op1… >> val result2 = myrdd.map(op2(_)) >> result2.count() // Will load `myrdd` from HDFS and do map(op2… >> ``` >> >> On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi >> wrote: >> >>> Calling cache/persist fails all our jobs (i have posted 2 threads on >>> this). >>> >>> And we're giving up hope in finding a solution. >>> So I'd like to find a workaround for that: >>> >>> If I save an RDD to hdfs and read it back, can I use it in more than one >>> operation? >>> >>> Example: (using cache) >>> // do a whole bunch of transformations on an RDD >>> >>> myrdd.cache() >>> >>> val result1 = myrdd.map(op1(_)) >>> >>> val result2 = myrdd.map(op2(_)) >>> >>> // in the above I am assuming that a call to cache will prevent all >>> previous transformation from being calculated twice >>> >>> I'd like to somehow get result1 and result2 without duplicating work. >>> How can I do that? >>> >>> thanks >>> >>> Jeff >>> >> >> >
Re: How can i remove the need for calling cache
I'm not sure that "checkpointed" means the same thing in that sentence. You can run a simple test using `spark-shell`: sc.setCheckpointDir("/tmp/checkpoint") val rdd = sc.parallelize(1 to 10).map(x => { Thread.sleep(1000) x }) rdd.checkpoint() rdd.foreach(println) // Will take 10 seconds rdd.foreach(println) // Will be instant, because the RDD is checkpointed On Wed, Aug 2, 2017 at 7:05 PM, jeff saremi wrote: > Vadim: > > This is from the Mastering Spark book: > > *"It is strongly recommended that a checkpointed RDD is persisted in > memory, otherwise saving it on a file will require recomputation."* > > > To me that means checkpoint will not prevent the recomputation that i was > hoping for > -- > *From:* Vadim Semenov > *Sent:* Tuesday, August 1, 2017 12:05:17 PM > *To:* jeff saremi > *Cc:* user@spark.apache.org > *Subject:* Re: How can i remove the need for calling cache > > You can use `.checkpoint()`: > ``` > val sc: SparkContext > sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory") > myrdd.checkpoint() > val result1 = myrdd.map(op1(_)) > result1.count() // Will save `myrdd` to HDFS and do map(op1… > val result2 = myrdd.map(op2(_)) > result2.count() // Will load `myrdd` from HDFS and do map(op2… > ``` > > On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi > wrote: > >> Calling cache/persist fails all our jobs (i have posted 2 threads on >> this). >> >> And we're giving up hope in finding a solution. >> So I'd like to find a workaround for that: >> >> If I save an RDD to hdfs and read it back, can I use it in more than one >> operation? >> >> Example: (using cache) >> // do a whole bunch of transformations on an RDD >> >> myrdd.cache() >> >> val result1 = myrdd.map(op1(_)) >> >> val result2 = myrdd.map(op2(_)) >> >> // in the above I am assuming that a call to cache will prevent all >> previous transformation from being calculated twice >> >> I'd like to somehow get result1 and result2 without duplicating work. How >> can I do that? >> >> thanks >> >> Jeff >> > >
Re: How can i remove the need for calling cache
On 3 August 2017 at 01:05, jeff saremi wrote: > Vadim: > > This is from the Mastering Spark book: > > "It is strongly recommended that a checkpointed RDD is persisted in memory, > otherwise saving it on a file will require recomputation." Is this really true? I had the impression that DAG will not be carried out once RDD is serialized to an external file, so 'saveAsObjectFile' saves DAG as well? - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How can i remove the need for calling cache
Vadim: This is from the Mastering Spark book: "It is strongly recommended that a checkpointed RDD is persisted in memory, otherwise saving it on a file will require recomputation." To me that means checkpoint will not prevent the recomputation that i was hoping for From: Vadim Semenov Sent: Tuesday, August 1, 2017 12:05:17 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: How can i remove the need for calling cache You can use `.checkpoint()`: ``` val sc: SparkContext sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory") myrdd.checkpoint() val result1 = myrdd.map(op1(_)) result1.count() // Will save `myrdd` to HDFS and do map(op1… val result2 = myrdd.map(op2(_)) result2.count() // Will load `myrdd` from HDFS and do map(op2… ``` On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi mailto:jeffsar...@hotmail.com>> wrote: Calling cache/persist fails all our jobs (i have posted 2 threads on this). And we're giving up hope in finding a solution. So I'd like to find a workaround for that: If I save an RDD to hdfs and read it back, can I use it in more than one operation? Example: (using cache) // do a whole bunch of transformations on an RDD myrdd.cache() val result1 = myrdd.map(op1(_)) val result2 = myrdd.map(op2(_)) // in the above I am assuming that a call to cache will prevent all previous transformation from being calculated twice I'd like to somehow get result1 and result2 without duplicating work. How can I do that? thanks Jeff
Re: How can i remove the need for calling cache
Thanks Mark. I'll examine the status more carefully to observe this. From: Mark Hamstra Sent: Tuesday, August 1, 2017 11:25:46 AM To: user@spark.apache.org Subject: Re: How can i remove the need for calling cache Very likely, much of the potential duplication is already being avoided even without calling cache/persist. When running the above code without `myrdd.cache`, have you looked at the Spark web UI for the Jobs? For at least one of them you will likely see that many Stages are marked as "skipped", which means that prior shuffle files that cover the results of those Stages were still available, so Spark did not recompute those results. Spark will eventually clean up those shuffle files (unless you hold onto a reference to them), but if your Jobs using myrdd run fairly close together in time, then duplication is already minimized even without an explicit cache call. On Tue, Aug 1, 2017 at 11:05 AM, jeff saremi mailto:jeffsar...@hotmail.com>> wrote: Calling cache/persist fails all our jobs (i have posted 2 threads on this). And we're giving up hope in finding a solution. So I'd like to find a workaround for that: If I save an RDD to hdfs and read it back, can I use it in more than one operation? Example: (using cache) // do a whole bunch of transformations on an RDD myrdd.cache() val result1 = myrdd.map(op1(_)) val result2 = myrdd.map(op2(_)) // in the above I am assuming that a call to cache will prevent all previous transformation from being calculated twice I'd like to somehow get result1 and result2 without duplicating work. How can I do that? thanks Jeff
Re: How can i remove the need for calling cache
Thanks Vadim. I'll try that From: Vadim Semenov Sent: Tuesday, August 1, 2017 12:05:17 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: How can i remove the need for calling cache You can use `.checkpoint()`: ``` val sc: SparkContext sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory") myrdd.checkpoint() val result1 = myrdd.map(op1(_)) result1.count() // Will save `myrdd` to HDFS and do map(op1… val result2 = myrdd.map(op2(_)) result2.count() // Will load `myrdd` from HDFS and do map(op2… ``` On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi mailto:jeffsar...@hotmail.com>> wrote: Calling cache/persist fails all our jobs (i have posted 2 threads on this). And we're giving up hope in finding a solution. So I'd like to find a workaround for that: If I save an RDD to hdfs and read it back, can I use it in more than one operation? Example: (using cache) // do a whole bunch of transformations on an RDD myrdd.cache() val result1 = myrdd.map(op1(_)) val result2 = myrdd.map(op2(_)) // in the above I am assuming that a call to cache will prevent all previous transformation from being calculated twice I'd like to somehow get result1 and result2 without duplicating work. How can I do that? thanks Jeff
Re: How can i remove the need for calling cache
You can use `.checkpoint()`: ``` val sc: SparkContext sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory") myrdd.checkpoint() val result1 = myrdd.map(op1(_)) result1.count() // Will save `myrdd` to HDFS and do map(op1… val result2 = myrdd.map(op2(_)) result2.count() // Will load `myrdd` from HDFS and do map(op2… ``` On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi wrote: > Calling cache/persist fails all our jobs (i have posted 2 threads on > this). > > And we're giving up hope in finding a solution. > So I'd like to find a workaround for that: > > If I save an RDD to hdfs and read it back, can I use it in more than one > operation? > > Example: (using cache) > // do a whole bunch of transformations on an RDD > > myrdd.cache() > > val result1 = myrdd.map(op1(_)) > > val result2 = myrdd.map(op2(_)) > > // in the above I am assuming that a call to cache will prevent all > previous transformation from being calculated twice > > I'd like to somehow get result1 and result2 without duplicating work. How > can I do that? > > thanks > > Jeff >
Re: How can i remove the need for calling cache
here are the threads that talk about problems we're experiencing. These problems exacerbate when we use cache/persist https://www.mail-archive.com/user@spark.apache.org/msg64987.html https://www.mail-archive.com/user@spark.apache.org/msg64986.html So I am looking for a way to reproduce the same effect as in my sample code without the use of cache(). If I use myrdd.count() would that be a good alternative? thanks From: lucas.g...@gmail.com Sent: Tuesday, August 1, 2017 11:23:04 AM To: jeff saremi Cc: user@spark.apache.org Subject: Re: How can i remove the need for calling cache Hi Jeff, that looks sane to me. Do you have additional details? On 1 August 2017 at 11:05, jeff saremi mailto:jeffsar...@hotmail.com>> wrote: Calling cache/persist fails all our jobs (i have posted 2 threads on this). And we're giving up hope in finding a solution. So I'd like to find a workaround for that: If I save an RDD to hdfs and read it back, can I use it in more than one operation? Example: (using cache) // do a whole bunch of transformations on an RDD myrdd.cache() val result1 = myrdd.map(op1(_)) val result2 = myrdd.map(op2(_)) // in the above I am assuming that a call to cache will prevent all previous transformation from being calculated twice I'd like to somehow get result1 and result2 without duplicating work. How can I do that? thanks Jeff
Re: How can i remove the need for calling cache
Very likely, much of the potential duplication is already being avoided even without calling cache/persist. When running the above code without `myrdd.cache`, have you looked at the Spark web UI for the Jobs? For at least one of them you will likely see that many Stages are marked as "skipped", which means that prior shuffle files that cover the results of those Stages were still available, so Spark did not recompute those results. Spark will eventually clean up those shuffle files (unless you hold onto a reference to them), but if your Jobs using myrdd run fairly close together in time, then duplication is already minimized even without an explicit cache call. On Tue, Aug 1, 2017 at 11:05 AM, jeff saremi wrote: > Calling cache/persist fails all our jobs (i have posted 2 threads on > this). > > And we're giving up hope in finding a solution. > So I'd like to find a workaround for that: > > If I save an RDD to hdfs and read it back, can I use it in more than one > operation? > > Example: (using cache) > // do a whole bunch of transformations on an RDD > > myrdd.cache() > > val result1 = myrdd.map(op1(_)) > > val result2 = myrdd.map(op2(_)) > > // in the above I am assuming that a call to cache will prevent all > previous transformation from being calculated twice > > I'd like to somehow get result1 and result2 without duplicating work. How > can I do that? > > thanks > > Jeff >
Re: How can i remove the need for calling cache
Hi Jeff, that looks sane to me. Do you have additional details? On 1 August 2017 at 11:05, jeff saremi wrote: > Calling cache/persist fails all our jobs (i have posted 2 threads on > this). > > And we're giving up hope in finding a solution. > So I'd like to find a workaround for that: > > If I save an RDD to hdfs and read it back, can I use it in more than one > operation? > > Example: (using cache) > // do a whole bunch of transformations on an RDD > > myrdd.cache() > > val result1 = myrdd.map(op1(_)) > > val result2 = myrdd.map(op2(_)) > > // in the above I am assuming that a call to cache will prevent all > previous transformation from being calculated twice > > I'd like to somehow get result1 and result2 without duplicating work. How > can I do that? > > thanks > > Jeff >
How can i remove the need for calling cache
Calling cache/persist fails all our jobs (i have posted 2 threads on this). And we're giving up hope in finding a solution. So I'd like to find a workaround for that: If I save an RDD to hdfs and read it back, can I use it in more than one operation? Example: (using cache) // do a whole bunch of transformations on an RDD myrdd.cache() val result1 = myrdd.map(op1(_)) val result2 = myrdd.map(op2(_)) // in the above I am assuming that a call to cache will prevent all previous transformation from being calculated twice I'd like to somehow get result1 and result2 without duplicating work. How can I do that? thanks Jeff