You can also use checkpoint to truncate the lineage and the data can be
persisted to HDFS. Fundamentally the state of the RDD needs to be saved to
memory or disk if you don't want to repeat the computation.

Thanks
Shivaram

On Thu, Jul 16, 2015 at 4:59 PM, Ulanov, Alexander <alexander.ula...@hp.com>
wrote:

>  Dear Spark developers,
>
>
>
> What happens if RDD does not fit into memory and cache would not work in
> the code below? Will all previous iterations repeated each new iteration
> within iterative RDD update (as described below)?
>
>
>
> Also, could you clarify regarding DataFrame and GC overhead: does setting 
> spark.sql.unsafe.enabled=true
> removes the GC when persisting/unpersisting the DataFrame?
>
>
>
> Best regards, Alexander
>
>
>
> *From:* Ulanov, Alexander
> *Sent:* Monday, July 13, 2015 11:15 AM
> *To:* shiva...@eecs.berkeley.edu
> *Cc:* dev@spark.apache.org
> *Subject:* RE: Model parallelism with RDD
>
>
>
> Below are the average timings for one iteration of model update with RDD
>  (with cache, as Shivaram suggested):
>
> Model size, RDD[Double].count / time, s
>
> 10M 0.585336926
>
> 100M 1.767947506
>
> 1B 125.6078817
>
>
>
> There is a ~100x increase in time while 10x increase in model size (from
> 100 million to 1 billion of Double). More than half of the time is spent in
> GC, and this time varies heavily. Two questions:
>
> 1)Can I use project Tungsten’s unsafe? Actually, can I reduce the GC time
> if I use DataFrame instead of RDD and set the Tungsten key:
> spark.sql.unsafe.enabled=true ?
>
> 2) RDD[Double] of one billion elements is 26.1GB persisted (as Spark UI
> shows). It is around 26 bytes per element. How many bytes is RDD overhead?
>
>
>
> The code:
>
> val modelSize = 1000000000
>
> val numIterations = 10
>
> val parallelism = 5
>
> var oldRDD = sc.parallelize(1 to modelSize, parallelism).map(x =>
> 0.1).cache
>
> var newRDD = sc.parallelize(1 to 1, parallelism).map(x => 0.1)
>
> var i = 0
>
> var avgTime = 0.0
>
> while (i < numIterations) {
>
>   val t = System.nanoTime()
>
>   val newRDD = oldRDD.map(x => x * x)
>
>   newRDD.cache
>
>   newRDD.count()
>
>   oldRDD.unpersist(true)
>
>   newRDD.mean
>
>   avgTime += (System.nanoTime() - t) / 1e9
>
>   oldRDD = newRDD
>
>   i += 1
>
> }
>
> println("Avg iteration time:" + avgTime / numIterations)
>
>
>
> Best regards, Alexander
>
>
>
> *From:* Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu
> <shiva...@eecs.berkeley.edu>]
> *Sent:* Friday, July 10, 2015 10:04 PM
> *To:* Ulanov, Alexander
> *Cc:* <shiva...@eecs.berkeley.edu>; dev@spark.apache.org
> *Subject:* Re: Model parallelism with RDD
>
>
>
> Yeah I can see that being the case -- caching implies creating objects
> that will be stored in memory. So there is a trade-off between storing data
> in memory but having to garbage collect it later vs. recomputing the data.
>
>
>
> Shivaram
>
>
>
> On Fri, Jul 10, 2015 at 9:49 PM, Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
>
> Hi Shivaram,
>
> Thank you for suggestion! If I do .cache and .count, each iteration take
> much more time, which is spent in GC. Is it normal?
>
> 10 июля 2015 г., в 21:23, Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu<mailto:shiva...@eecs.berkeley.edu>> написал(а):
>
> I think you need to do `newRDD.cache()` and `newRDD.count` before you do
> oldRDD.unpersist(true) -- Otherwise it might be recomputing all the
> previous iterations each time.
>
> Thanks
> Shivaram
>
> On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander <
> alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
> Hi,
>
> I am interested how scalable can be the model parallelism within Spark.
> Suppose, the model contains N weights of type Double and N is so large that
> does not fit into the memory of a single node. So, we can store the model
> in RDD[Double] within several nodes. To train the model, one needs to
> perform K iterations that update all the weights and check the convergence.
> Then we also need to exchange some weights between the nodes to synchronize
> the model or update the global state. I’ve sketched the code that does
> iterative updates with RDD (without global update yet). Surprisingly, each
> iteration takes more time than previous as shown below (time in seconds).
> Could you suggest what is the reason for that? I’ve checked GC, it does
> something within few milliseconds.
>
> Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel
> Xeon 2.2, 16GB RAM each
> Iteration 0 time:1.127990986
> Iteration 1 time:1.391120414
> Iteration 2 time:1.6429691381000002
> Iteration 3 time:1.9344402954
> Iteration 4 time:2.2075294246999997
> Iteration 5 time:2.6328659593
> Iteration 6 time:2.7911690492999996
> Iteration 7 time:3.0850374104
> Iteration 8 time:3.4031050061
> Iteration 9 time:3.8826580919
>
> Code:
> val modelSize = 1000000000
> val numIterations = 10
> val parallelizm = 5
> var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x => 0.1)
> var newRDD = sc.parallelize(1 to 1, parallelizm).map(x => 0.1)
> var i = 0
> while (i < numIterations) {
>   val t = System.nanoTime()
>   // updating the weights
>   val newRDD = oldRDD.map(x => x * x)
>   oldRDD.unpersist(true)
>   // “checking” convergence
>   newRDD.mean
>   println("Iteration " + i + " time:" + (System.nanoTime() - t) / 1e9 /
> numIterations)
>   oldRDD = newRDD
>   i += 1
> }
>
>
> Best regards, Alexander
>
>
>

Reply via email to