Hi Shivaram,

Thank you for the explanation. Is there a direct way to check the length of the 
lineage i.e. that the computation is repeated?

Best regards, Alexander

From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
Sent: Friday, July 17, 2015 10:10 AM
To: Ulanov, Alexander
Cc: shiva...@eecs.berkeley.edu; dev@spark.apache.org
Subject: Re: Model parallelism with RDD

You can also use checkpoint to truncate the lineage and the data can be 
persisted to HDFS. Fundamentally the state of the RDD needs to be saved to 
memory or disk if you don't want to repeat the computation.

Thanks
Shivaram

On Thu, Jul 16, 2015 at 4:59 PM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
Dear Spark developers,

What happens if RDD does not fit into memory and cache would not work in the 
code below? Will all previous iterations repeated each new iteration within 
iterative RDD update (as described below)?

Also, could you clarify regarding DataFrame and GC overhead: does setting 
spark.sql.unsafe.enabled=true removes the GC when persisting/unpersisting the 
DataFrame?

Best regards, Alexander

From: Ulanov, Alexander
Sent: Monday, July 13, 2015 11:15 AM
To: shiva...@eecs.berkeley.edu<mailto:shiva...@eecs.berkeley.edu>
Cc: dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: RE: Model parallelism with RDD

Below are the average timings for one iteration of model update with RDD  (with 
cache, as Shivaram suggested):
Model size, RDD[Double].count / time, s
10M 0.585336926
100M 1.767947506
1B 125.6078817

There is a ~100x increase in time while 10x increase in model size (from 100 
million to 1 billion of Double). More than half of the time is spent in GC, and 
this time varies heavily. Two questions:
1)Can I use project Tungsten’s unsafe? Actually, can I reduce the GC time if I 
use DataFrame instead of RDD and set the Tungsten key: 
spark.sql.unsafe.enabled=true ?
2) RDD[Double] of one billion elements is 26.1GB persisted (as Spark UI shows). 
It is around 26 bytes per element. How many bytes is RDD overhead?

The code:
val modelSize = 1000000000
val numIterations = 10
val parallelism = 5
var oldRDD = sc.parallelize(1 to modelSize, parallelism).map(x => 0.1).cache
var newRDD = sc.parallelize(1 to 1, parallelism).map(x => 0.1)
var i = 0
var avgTime = 0.0
while (i < numIterations) {
  val t = System.nanoTime()
  val newRDD = oldRDD.map(x => x * x)
  newRDD.cache
  newRDD.count()
  oldRDD.unpersist(true)
  newRDD.mean
  avgTime += (System.nanoTime() - t) / 1e9
  oldRDD = newRDD
  i += 1
}
println("Avg iteration time:" + avgTime / numIterations)

Best regards, Alexander

From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
Sent: Friday, July 10, 2015 10:04 PM
To: Ulanov, Alexander
Cc: <shiva...@eecs.berkeley.edu<mailto:shiva...@eecs.berkeley.edu>>; 
dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: Re: Model parallelism with RDD

Yeah I can see that being the case -- caching implies creating objects that 
will be stored in memory. So there is a trade-off between storing data in 
memory but having to garbage collect it later vs. recomputing the data.

Shivaram

On Fri, Jul 10, 2015 at 9:49 PM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
Hi Shivaram,

Thank you for suggestion! If I do .cache and .count, each iteration take much 
more time, which is spent in GC. Is it normal?

10 июля 2015 г., в 21:23, Shivaram Venkataraman 
<shiva...@eecs.berkeley.edu<mailto:shiva...@eecs.berkeley.edu><mailto:shiva...@eecs.berkeley.edu<mailto:shiva...@eecs.berkeley.edu>>>
 написал(а):

I think you need to do `newRDD.cache()` and `newRDD.count` before you do 
oldRDD.unpersist(true) -- Otherwise it might be recomputing all the previous 
iterations each time.

Thanks
Shivaram
On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com><mailto:alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>>>
 wrote:
Hi,

I am interested how scalable can be the model parallelism within Spark. 
Suppose, the model contains N weights of type Double and N is so large that 
does not fit into the memory of a single node. So, we can store the model in 
RDD[Double] within several nodes. To train the model, one needs to perform K 
iterations that update all the weights and check the convergence. Then we also 
need to exchange some weights between the nodes to synchronize the model or 
update the global state. I’ve sketched the code that does iterative updates 
with RDD (without global update yet). Surprisingly, each iteration takes more 
time than previous as shown below (time in seconds). Could you suggest what is 
the reason for that? I’ve checked GC, it does something within few milliseconds.

Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel Xeon 
2.2, 16GB RAM each
Iteration 0 time:1.127990986
Iteration 1 time:1.391120414
Iteration 2 time:1.6429691381000002
Iteration 3 time:1.9344402954
Iteration 4 time:2.2075294246999997
Iteration 5 time:2.6328659593
Iteration 6 time:2.7911690492999996
Iteration 7 time:3.0850374104
Iteration 8 time:3.4031050061
Iteration 9 time:3.8826580919

Code:
val modelSize = 1000000000
val numIterations = 10
val parallelizm = 5
var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x => 0.1)
var newRDD = sc.parallelize(1 to 1, parallelizm).map(x => 0.1)
var i = 0
while (i < numIterations) {
  val t = System.nanoTime()
  // updating the weights
  val newRDD = oldRDD.map(x => x * x)
  oldRDD.unpersist(true)
  // “checking” convergence
  newRDD.mean
  println("Iteration " + i + " time:" + (System.nanoTime() - t) / 1e9 / 
numIterations)
  oldRDD = newRDD
  i += 1
}


Best regards, Alexander


Reply via email to