Yeah I can see that being the case -- caching implies creating objects that
will be stored in memory. So there is a trade-off between storing data in
memory but having to garbage collect it later vs. recomputing the data.


On Fri, Jul 10, 2015 at 9:49 PM, Ulanov, Alexander <>

> Hi Shivaram,
> Thank you for suggestion! If I do .cache and .count, each iteration take
> much more time, which is spent in GC. Is it normal?
> 10 июля 2015 г., в 21:23, Shivaram Venkataraman <
><>> написал(а):
> I think you need to do `newRDD.cache()` and `newRDD.count` before you do
> oldRDD.unpersist(true) -- Otherwise it might be recomputing all the
> previous iterations each time.
> Thanks
> Shivaram
> On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander <
><>> wrote:
> Hi,
> I am interested how scalable can be the model parallelism within Spark.
> Suppose, the model contains N weights of type Double and N is so large that
> does not fit into the memory of a single node. So, we can store the model
> in RDD[Double] within several nodes. To train the model, one needs to
> perform K iterations that update all the weights and check the convergence.
> Then we also need to exchange some weights between the nodes to synchronize
> the model or update the global state. I’ve sketched the code that does
> iterative updates with RDD (without global update yet). Surprisingly, each
> iteration takes more time than previous as shown below (time in seconds).
> Could you suggest what is the reason for that? I’ve checked GC, it does
> something within few milliseconds.
> Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel
> Xeon 2.2, 16GB RAM each
> Iteration 0 time:1.127990986
> Iteration 1 time:1.391120414
> Iteration 2 time:1.6429691381000002
> Iteration 3 time:1.9344402954
> Iteration 4 time:2.2075294246999997
> Iteration 5 time:2.6328659593
> Iteration 6 time:2.7911690492999996
> Iteration 7 time:3.0850374104
> Iteration 8 time:3.4031050061
> Iteration 9 time:3.8826580919
> Code:
> val modelSize = 1000000000
> val numIterations = 10
> val parallelizm = 5
> var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x => 0.1)
> var newRDD = sc.parallelize(1 to 1, parallelizm).map(x => 0.1)
> var i = 0
> while (i < numIterations) {
>   val t = System.nanoTime()
>   // updating the weights
>   val newRDD = => x * x)
>   oldRDD.unpersist(true)
>   // “checking” convergence
>   newRDD.mean
>   println("Iteration " + i + " time:" + (System.nanoTime() - t) / 1e9 /
> numIterations)
>   oldRDD = newRDD
>   i += 1
> }
> Best regards, Alexander

Reply via email to