Yeah I can see that being the case -- caching implies creating objects that will be stored in memory. So there is a trade-off between storing data in memory but having to garbage collect it later vs. recomputing the data.
Shivaram On Fri, Jul 10, 2015 at 9:49 PM, Ulanov, Alexander <alexander.ula...@hp.com> wrote: > Hi Shivaram, > > Thank you for suggestion! If I do .cache and .count, each iteration take > much more time, which is spent in GC. Is it normal? > > 10 июля 2015 г., в 21:23, Shivaram Venkataraman < > shiva...@eecs.berkeley.edu<mailto:shiva...@eecs.berkeley.edu>> написал(а): > > I think you need to do `newRDD.cache()` and `newRDD.count` before you do > oldRDD.unpersist(true) -- Otherwise it might be recomputing all the > previous iterations each time. > > Thanks > Shivaram > > On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander < > alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: > Hi, > > I am interested how scalable can be the model parallelism within Spark. > Suppose, the model contains N weights of type Double and N is so large that > does not fit into the memory of a single node. So, we can store the model > in RDD[Double] within several nodes. To train the model, one needs to > perform K iterations that update all the weights and check the convergence. > Then we also need to exchange some weights between the nodes to synchronize > the model or update the global state. I’ve sketched the code that does > iterative updates with RDD (without global update yet). Surprisingly, each > iteration takes more time than previous as shown below (time in seconds). > Could you suggest what is the reason for that? I’ve checked GC, it does > something within few milliseconds. > > Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel > Xeon 2.2, 16GB RAM each > Iteration 0 time:1.127990986 > Iteration 1 time:1.391120414 > Iteration 2 time:1.6429691381000002 > Iteration 3 time:1.9344402954 > Iteration 4 time:2.2075294246999997 > Iteration 5 time:2.6328659593 > Iteration 6 time:2.7911690492999996 > Iteration 7 time:3.0850374104 > Iteration 8 time:3.4031050061 > Iteration 9 time:3.8826580919 > > Code: > val modelSize = 1000000000 > val numIterations = 10 > val parallelizm = 5 > var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x => 0.1) > var newRDD = sc.parallelize(1 to 1, parallelizm).map(x => 0.1) > var i = 0 > while (i < numIterations) { > val t = System.nanoTime() > // updating the weights > val newRDD = oldRDD.map(x => x * x) > oldRDD.unpersist(true) > // “checking” convergence > newRDD.mean > println("Iteration " + i + " time:" + (System.nanoTime() - t) / 1e9 / > numIterations) > oldRDD = newRDD > i += 1 > } > > > Best regards, Alexander > >