Re: Model parallelism with RDD

2015-07-17 Thread Shivaram Venkataraman
You can also use checkpoint to truncate the lineage and the data can be
persisted to HDFS. Fundamentally the state of the RDD needs to be saved to
memory or disk if you don't want to repeat the computation.

Thanks
Shivaram

On Thu, Jul 16, 2015 at 4:59 PM, Ulanov, Alexander alexander.ula...@hp.com
wrote:

  Dear Spark developers,



 What happens if RDD does not fit into memory and cache would not work in
 the code below? Will all previous iterations repeated each new iteration
 within iterative RDD update (as described below)?



 Also, could you clarify regarding DataFrame and GC overhead: does setting 
 spark.sql.unsafe.enabled=true
 removes the GC when persisting/unpersisting the DataFrame?



 Best regards, Alexander



 *From:* Ulanov, Alexander
 *Sent:* Monday, July 13, 2015 11:15 AM
 *To:* shiva...@eecs.berkeley.edu
 *Cc:* dev@spark.apache.org
 *Subject:* RE: Model parallelism with RDD



 Below are the average timings for one iteration of model update with RDD
  (with cache, as Shivaram suggested):

 Model size, RDD[Double].count / time, s

 10M 0.585336926

 100M 1.767947506

 1B 125.6078817



 There is a ~100x increase in time while 10x increase in model size (from
 100 million to 1 billion of Double). More than half of the time is spent in
 GC, and this time varies heavily. Two questions:

 1)Can I use project Tungsten’s unsafe? Actually, can I reduce the GC time
 if I use DataFrame instead of RDD and set the Tungsten key:
 spark.sql.unsafe.enabled=true ?

 2) RDD[Double] of one billion elements is 26.1GB persisted (as Spark UI
 shows). It is around 26 bytes per element. How many bytes is RDD overhead?



 The code:

 val modelSize = 10

 val numIterations = 10

 val parallelism = 5

 var oldRDD = sc.parallelize(1 to modelSize, parallelism).map(x =
 0.1).cache

 var newRDD = sc.parallelize(1 to 1, parallelism).map(x = 0.1)

 var i = 0

 var avgTime = 0.0

 while (i  numIterations) {

   val t = System.nanoTime()

   val newRDD = oldRDD.map(x = x * x)

   newRDD.cache

   newRDD.count()

   oldRDD.unpersist(true)

   newRDD.mean

   avgTime += (System.nanoTime() - t) / 1e9

   oldRDD = newRDD

   i += 1

 }

 println(Avg iteration time: + avgTime / numIterations)



 Best regards, Alexander



 *From:* Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu
 shiva...@eecs.berkeley.edu]
 *Sent:* Friday, July 10, 2015 10:04 PM
 *To:* Ulanov, Alexander
 *Cc:* shiva...@eecs.berkeley.edu; dev@spark.apache.org
 *Subject:* Re: Model parallelism with RDD



 Yeah I can see that being the case -- caching implies creating objects
 that will be stored in memory. So there is a trade-off between storing data
 in memory but having to garbage collect it later vs. recomputing the data.



 Shivaram



 On Fri, Jul 10, 2015 at 9:49 PM, Ulanov, Alexander 
 alexander.ula...@hp.com wrote:

 Hi Shivaram,

 Thank you for suggestion! If I do .cache and .count, each iteration take
 much more time, which is spent in GC. Is it normal?

 10 июля 2015 г., в 21:23, Shivaram Venkataraman 
 shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu написал(а):

 I think you need to do `newRDD.cache()` and `newRDD.count` before you do
 oldRDD.unpersist(true) -- Otherwise it might be recomputing all the
 previous iterations each time.

 Thanks
 Shivaram

 On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander 
 alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
 Hi,

 I am interested how scalable can be the model parallelism within Spark.
 Suppose, the model contains N weights of type Double and N is so large that
 does not fit into the memory of a single node. So, we can store the model
 in RDD[Double] within several nodes. To train the model, one needs to
 perform K iterations that update all the weights and check the convergence.
 Then we also need to exchange some weights between the nodes to synchronize
 the model or update the global state. I’ve sketched the code that does
 iterative updates with RDD (without global update yet). Surprisingly, each
 iteration takes more time than previous as shown below (time in seconds).
 Could you suggest what is the reason for that? I’ve checked GC, it does
 something within few milliseconds.

 Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel
 Xeon 2.2, 16GB RAM each
 Iteration 0 time:1.127990986
 Iteration 1 time:1.391120414
 Iteration 2 time:1.642969138102
 Iteration 3 time:1.9344402954
 Iteration 4 time:2.207529424697
 Iteration 5 time:2.6328659593
 Iteration 6 time:2.791169049296
 Iteration 7 time:3.0850374104
 Iteration 8 time:3.4031050061
 Iteration 9 time:3.8826580919

 Code:
 val modelSize = 10
 val numIterations = 10
 val parallelizm = 5
 var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x = 0.1)
 var newRDD = sc.parallelize(1 to 1, parallelizm).map(x = 0.1)
 var i = 0
 while (i  numIterations) {
   val t = System.nanoTime()
   // updating the weights
   val newRDD = oldRDD.map(x = x * x

RE: Model parallelism with RDD

2015-07-17 Thread Ulanov, Alexander
Hi Shivaram,

Thank you for the explanation. Is there a direct way to check the length of the 
lineage i.e. that the computation is repeated?

Best regards, Alexander

From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
Sent: Friday, July 17, 2015 10:10 AM
To: Ulanov, Alexander
Cc: shiva...@eecs.berkeley.edu; dev@spark.apache.org
Subject: Re: Model parallelism with RDD

You can also use checkpoint to truncate the lineage and the data can be 
persisted to HDFS. Fundamentally the state of the RDD needs to be saved to 
memory or disk if you don't want to repeat the computation.

Thanks
Shivaram

On Thu, Jul 16, 2015 at 4:59 PM, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
Dear Spark developers,

What happens if RDD does not fit into memory and cache would not work in the 
code below? Will all previous iterations repeated each new iteration within 
iterative RDD update (as described below)?

Also, could you clarify regarding DataFrame and GC overhead: does setting 
spark.sql.unsafe.enabled=true removes the GC when persisting/unpersisting the 
DataFrame?

Best regards, Alexander

From: Ulanov, Alexander
Sent: Monday, July 13, 2015 11:15 AM
To: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu
Cc: dev@spark.apache.orgmailto:dev@spark.apache.org
Subject: RE: Model parallelism with RDD

Below are the average timings for one iteration of model update with RDD  (with 
cache, as Shivaram suggested):
Model size, RDD[Double].count / time, s
10M 0.585336926
100M 1.767947506
1B 125.6078817

There is a ~100x increase in time while 10x increase in model size (from 100 
million to 1 billion of Double). More than half of the time is spent in GC, and 
this time varies heavily. Two questions:
1)Can I use project Tungsten’s unsafe? Actually, can I reduce the GC time if I 
use DataFrame instead of RDD and set the Tungsten key: 
spark.sql.unsafe.enabled=true ?
2) RDD[Double] of one billion elements is 26.1GB persisted (as Spark UI shows). 
It is around 26 bytes per element. How many bytes is RDD overhead?

The code:
val modelSize = 10
val numIterations = 10
val parallelism = 5
var oldRDD = sc.parallelize(1 to modelSize, parallelism).map(x = 0.1).cache
var newRDD = sc.parallelize(1 to 1, parallelism).map(x = 0.1)
var i = 0
var avgTime = 0.0
while (i  numIterations) {
  val t = System.nanoTime()
  val newRDD = oldRDD.map(x = x * x)
  newRDD.cache
  newRDD.count()
  oldRDD.unpersist(true)
  newRDD.mean
  avgTime += (System.nanoTime() - t) / 1e9
  oldRDD = newRDD
  i += 1
}
println(Avg iteration time: + avgTime / numIterations)

Best regards, Alexander

From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
Sent: Friday, July 10, 2015 10:04 PM
To: Ulanov, Alexander
Cc: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu; 
dev@spark.apache.orgmailto:dev@spark.apache.org
Subject: Re: Model parallelism with RDD

Yeah I can see that being the case -- caching implies creating objects that 
will be stored in memory. So there is a trade-off between storing data in 
memory but having to garbage collect it later vs. recomputing the data.

Shivaram

On Fri, Jul 10, 2015 at 9:49 PM, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
Hi Shivaram,

Thank you for suggestion! If I do .cache and .count, each iteration take much 
more time, which is spent in GC. Is it normal?

10 июля 2015 г., в 21:23, Shivaram Venkataraman 
shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu
 написал(а):

I think you need to do `newRDD.cache()` and `newRDD.count` before you do 
oldRDD.unpersist(true) -- Otherwise it might be recomputing all the previous 
iterations each time.

Thanks
Shivaram
On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.commailto:alexander.ula...@hp.commailto:alexander.ula...@hp.com
 wrote:
Hi,

I am interested how scalable can be the model parallelism within Spark. 
Suppose, the model contains N weights of type Double and N is so large that 
does not fit into the memory of a single node. So, we can store the model in 
RDD[Double] within several nodes. To train the model, one needs to perform K 
iterations that update all the weights and check the convergence. Then we also 
need to exchange some weights between the nodes to synchronize the model or 
update the global state. I’ve sketched the code that does iterative updates 
with RDD (without global update yet). Surprisingly, each iteration takes more 
time than previous as shown below (time in seconds). Could you suggest what is 
the reason for that? I’ve checked GC, it does something within few milliseconds.

Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel Xeon 
2.2, 16GB RAM each
Iteration 0 time:1.127990986
Iteration 1 time:1.391120414
Iteration 2 time:1.642969138102
Iteration 3 time:1.9344402954
Iteration 4 time

RE: Model parallelism with RDD

2015-07-16 Thread Ulanov, Alexander
Dear Spark developers,

What happens if RDD does not fit into memory and cache would not work in the 
code below? Will all previous iterations repeated each new iteration within 
iterative RDD update (as described below)?

Also, could you clarify regarding DataFrame and GC overhead: does setting 
spark.sql.unsafe.enabled=true removes the GC when persisting/unpersisting the 
DataFrame?

Best regards, Alexander

From: Ulanov, Alexander
Sent: Monday, July 13, 2015 11:15 AM
To: shiva...@eecs.berkeley.edu
Cc: dev@spark.apache.org
Subject: RE: Model parallelism with RDD

Below are the average timings for one iteration of model update with RDD  (with 
cache, as Shivaram suggested):
Model size, RDD[Double].count / time, s
10M 0.585336926
100M 1.767947506
1B 125.6078817

There is a ~100x increase in time while 10x increase in model size (from 100 
million to 1 billion of Double). More than half of the time is spent in GC, and 
this time varies heavily. Two questions:
1)Can I use project Tungsten’s unsafe? Actually, can I reduce the GC time if I 
use DataFrame instead of RDD and set the Tungsten key: 
spark.sql.unsafe.enabled=true ?
2) RDD[Double] of one billion elements is 26.1GB persisted (as Spark UI shows). 
It is around 26 bytes per element. How many bytes is RDD overhead?

The code:
val modelSize = 10
val numIterations = 10
val parallelism = 5
var oldRDD = sc.parallelize(1 to modelSize, parallelism).map(x = 0.1).cache
var newRDD = sc.parallelize(1 to 1, parallelism).map(x = 0.1)
var i = 0
var avgTime = 0.0
while (i  numIterations) {
  val t = System.nanoTime()
  val newRDD = oldRDD.map(x = x * x)
  newRDD.cache
  newRDD.count()
  oldRDD.unpersist(true)
  newRDD.mean
  avgTime += (System.nanoTime() - t) / 1e9
  oldRDD = newRDD
  i += 1
}
println(Avg iteration time: + avgTime / numIterations)

Best regards, Alexander

From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
Sent: Friday, July 10, 2015 10:04 PM
To: Ulanov, Alexander
Cc: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu; 
dev@spark.apache.orgmailto:dev@spark.apache.org
Subject: Re: Model parallelism with RDD

Yeah I can see that being the case -- caching implies creating objects that 
will be stored in memory. So there is a trade-off between storing data in 
memory but having to garbage collect it later vs. recomputing the data.

Shivaram

On Fri, Jul 10, 2015 at 9:49 PM, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
Hi Shivaram,

Thank you for suggestion! If I do .cache and .count, each iteration take much 
more time, which is spent in GC. Is it normal?

10 июля 2015 г., в 21:23, Shivaram Venkataraman 
shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu
 написал(а):

I think you need to do `newRDD.cache()` and `newRDD.count` before you do 
oldRDD.unpersist(true) -- Otherwise it might be recomputing all the previous 
iterations each time.

Thanks
Shivaram
On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.commailto:alexander.ula...@hp.commailto:alexander.ula...@hp.com
 wrote:
Hi,

I am interested how scalable can be the model parallelism within Spark. 
Suppose, the model contains N weights of type Double and N is so large that 
does not fit into the memory of a single node. So, we can store the model in 
RDD[Double] within several nodes. To train the model, one needs to perform K 
iterations that update all the weights and check the convergence. Then we also 
need to exchange some weights between the nodes to synchronize the model or 
update the global state. I’ve sketched the code that does iterative updates 
with RDD (without global update yet). Surprisingly, each iteration takes more 
time than previous as shown below (time in seconds). Could you suggest what is 
the reason for that? I’ve checked GC, it does something within few milliseconds.

Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel Xeon 
2.2, 16GB RAM each
Iteration 0 time:1.127990986
Iteration 1 time:1.391120414
Iteration 2 time:1.642969138102
Iteration 3 time:1.9344402954
Iteration 4 time:2.207529424697
Iteration 5 time:2.6328659593
Iteration 6 time:2.791169049296
Iteration 7 time:3.0850374104
Iteration 8 time:3.4031050061
Iteration 9 time:3.8826580919

Code:
val modelSize = 10
val numIterations = 10
val parallelizm = 5
var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x = 0.1)
var newRDD = sc.parallelize(1 to 1, parallelizm).map(x = 0.1)
var i = 0
while (i  numIterations) {
  val t = System.nanoTime()
  // updating the weights
  val newRDD = oldRDD.map(x = x * x)
  oldRDD.unpersist(true)
  // “checking” convergence
  newRDD.mean
  println(Iteration  + i +  time: + (System.nanoTime() - t) / 1e9 / 
numIterations)
  oldRDD = newRDD
  i += 1
}


Best regards, Alexander



RE: Model parallelism with RDD

2015-07-13 Thread Ulanov, Alexander
Below are the average timings for one iteration of model update with RDD  (with 
cache, as Shivaram suggested):
Model size, RDD[Double].count / time, s
10M 0.585336926
100M 1.767947506
1B 125.6078817

There is a ~100x increase in time while 10x increase in model size (from 100 
million to 1 billion of Double). More than half of the time is spent in GC, and 
this time varies heavily. Two questions:
1)Can I use project Tungsten’s unsafe? Actually, can I reduce the GC time if I 
use DataFrame instead of RDD and set the Tungsten key: 
spark.sql.unsafe.enabled=true ?
2) RDD[Double] of one billion elements is 26.1GB persisted (as Spark UI shows). 
It is around 26 bytes per element. How many bytes is RDD overhead?

The code:
val modelSize = 10
val numIterations = 10
val parallelism = 5
var oldRDD = sc.parallelize(1 to modelSize, parallelism).map(x = 0.1).cache
var newRDD = sc.parallelize(1 to 1, parallelism).map(x = 0.1)
var i = 0
var avgTime = 0.0
while (i  numIterations) {
  val t = System.nanoTime()
  val newRDD = oldRDD.map(x = x * x)
  newRDD.cache
  newRDD.count()
  oldRDD.unpersist(true)
  newRDD.mean
  avgTime += (System.nanoTime() - t) / 1e9
  oldRDD = newRDD
  i += 1
}
println(Avg iteration time: + avgTime / numIterations)

Best regards, Alexander

From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
Sent: Friday, July 10, 2015 10:04 PM
To: Ulanov, Alexander
Cc: shiva...@eecs.berkeley.edu; dev@spark.apache.org
Subject: Re: Model parallelism with RDD

Yeah I can see that being the case -- caching implies creating objects that 
will be stored in memory. So there is a trade-off between storing data in 
memory but having to garbage collect it later vs. recomputing the data.

Shivaram

On Fri, Jul 10, 2015 at 9:49 PM, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
Hi Shivaram,

Thank you for suggestion! If I do .cache and .count, each iteration take much 
more time, which is spent in GC. Is it normal?

10 июля 2015 г., в 21:23, Shivaram Venkataraman 
shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu
 написал(а):

I think you need to do `newRDD.cache()` and `newRDD.count` before you do 
oldRDD.unpersist(true) -- Otherwise it might be recomputing all the previous 
iterations each time.

Thanks
Shivaram
On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.commailto:alexander.ula...@hp.commailto:alexander.ula...@hp.com
 wrote:
Hi,

I am interested how scalable can be the model parallelism within Spark. 
Suppose, the model contains N weights of type Double and N is so large that 
does not fit into the memory of a single node. So, we can store the model in 
RDD[Double] within several nodes. To train the model, one needs to perform K 
iterations that update all the weights and check the convergence. Then we also 
need to exchange some weights between the nodes to synchronize the model or 
update the global state. I’ve sketched the code that does iterative updates 
with RDD (without global update yet). Surprisingly, each iteration takes more 
time than previous as shown below (time in seconds). Could you suggest what is 
the reason for that? I’ve checked GC, it does something within few milliseconds.

Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel Xeon 
2.2, 16GB RAM each
Iteration 0 time:1.127990986
Iteration 1 time:1.391120414
Iteration 2 time:1.642969138102
Iteration 3 time:1.9344402954
Iteration 4 time:2.207529424697
Iteration 5 time:2.6328659593
Iteration 6 time:2.791169049296
Iteration 7 time:3.0850374104
Iteration 8 time:3.4031050061
Iteration 9 time:3.8826580919

Code:
val modelSize = 10
val numIterations = 10
val parallelizm = 5
var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x = 0.1)
var newRDD = sc.parallelize(1 to 1, parallelizm).map(x = 0.1)
var i = 0
while (i  numIterations) {
  val t = System.nanoTime()
  // updating the weights
  val newRDD = oldRDD.map(x = x * x)
  oldRDD.unpersist(true)
  // “checking” convergence
  newRDD.mean
  println(Iteration  + i +  time: + (System.nanoTime() - t) / 1e9 / 
numIterations)
  oldRDD = newRDD
  i += 1
}


Best regards, Alexander



Model parallelism with RDD

2015-07-10 Thread Ulanov, Alexander
Hi,

I am interested how scalable can be the model parallelism within Spark. 
Suppose, the model contains N weights of type Double and N is so large that 
does not fit into the memory of a single node. So, we can store the model in 
RDD[Double] within several nodes. To train the model, one needs to perform K 
iterations that update all the weights and check the convergence. Then we also 
need to exchange some weights between the nodes to synchronize the model or 
update the global state. I've sketched the code that does iterative updates 
with RDD (without global update yet). Surprisingly, each iteration takes more 
time than previous as shown below (time in seconds). Could you suggest what is 
the reason for that? I've checked GC, it does something within few milliseconds.

Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel Xeon 
2.2, 16GB RAM each
Iteration 0 time:1.127990986
Iteration 1 time:1.391120414
Iteration 2 time:1.642969138102
Iteration 3 time:1.9344402954
Iteration 4 time:2.207529424697
Iteration 5 time:2.6328659593
Iteration 6 time:2.791169049296
Iteration 7 time:3.0850374104
Iteration 8 time:3.4031050061
Iteration 9 time:3.8826580919

Code:
val modelSize = 10
val numIterations = 10
val parallelizm = 5
var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x = 0.1)
var newRDD = sc.parallelize(1 to 1, parallelizm).map(x = 0.1)
var i = 0
while (i  numIterations) {
  val t = System.nanoTime()
  // updating the weights
  val newRDD = oldRDD.map(x = x * x)
  oldRDD.unpersist(true)
  // checking convergence
  newRDD.mean
  println(Iteration  + i +  time: + (System.nanoTime() - t) / 1e9 / 
numIterations)
  oldRDD = newRDD
  i += 1
}


Best regards, Alexander


Re: Model parallelism with RDD

2015-07-10 Thread Shivaram Venkataraman
I think you need to do `newRDD.cache()` and `newRDD.count` before you do
oldRDD.unpersist(true) -- Otherwise it might be recomputing all the
previous iterations each time.

Thanks
Shivaram

On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander alexander.ula...@hp.com
wrote:

  Hi,



 I am interested how scalable can be the model parallelism within Spark.
 Suppose, the model contains N weights of type Double and N is so large that
 does not fit into the memory of a single node. So, we can store the model
 in RDD[Double] within several nodes. To train the model, one needs to
 perform K iterations that update all the weights and check the convergence.
 Then we also need to exchange some weights between the nodes to synchronize
 the model or update the global state. I’ve sketched the code that does
 iterative updates with RDD (without global update yet). Surprisingly, each
 iteration takes more time than previous as shown below (time in seconds).
 Could you suggest what is the reason for that? I’ve checked GC, it does
 something within few milliseconds.



 Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel
 Xeon 2.2, 16GB RAM each

  Iteration 0 time:1.127990986

 Iteration 1 time:1.391120414

 Iteration 2 time:1.642969138102

 Iteration 3 time:1.9344402954

 Iteration 4 time:2.207529424697

 Iteration 5 time:2.6328659593

 Iteration 6 time:2.791169049296

 Iteration 7 time:3.0850374104

 Iteration 8 time:3.4031050061

 Iteration 9 time:3.8826580919



 Code:

 val modelSize = 10

 val numIterations = 10

 val parallelizm = 5

 var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x = 0.1)

 var newRDD = sc.parallelize(1 to 1, parallelizm).map(x = 0.1)

 var i = 0

 while (i  numIterations) {

   val t = System.nanoTime()

   // updating the weights

   val newRDD = oldRDD.map(x = x * x)

   oldRDD.unpersist(true)

   // “checking” convergence

   newRDD.mean

   println(Iteration  + i +  time: + (System.nanoTime() - t) / 1e9 /
 numIterations)

   oldRDD = newRDD

   i += 1

 }





 Best regards, Alexander



Re: Model parallelism with RDD

2015-07-10 Thread Shivaram Venkataraman
Yeah I can see that being the case -- caching implies creating objects that
will be stored in memory. So there is a trade-off between storing data in
memory but having to garbage collect it later vs. recomputing the data.

Shivaram

On Fri, Jul 10, 2015 at 9:49 PM, Ulanov, Alexander alexander.ula...@hp.com
wrote:

 Hi Shivaram,

 Thank you for suggestion! If I do .cache and .count, each iteration take
 much more time, which is spent in GC. Is it normal?

 10 июля 2015 г., в 21:23, Shivaram Venkataraman 
 shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu написал(а):

 I think you need to do `newRDD.cache()` and `newRDD.count` before you do
 oldRDD.unpersist(true) -- Otherwise it might be recomputing all the
 previous iterations each time.

 Thanks
 Shivaram

 On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander 
 alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
 Hi,

 I am interested how scalable can be the model parallelism within Spark.
 Suppose, the model contains N weights of type Double and N is so large that
 does not fit into the memory of a single node. So, we can store the model
 in RDD[Double] within several nodes. To train the model, one needs to
 perform K iterations that update all the weights and check the convergence.
 Then we also need to exchange some weights between the nodes to synchronize
 the model or update the global state. I’ve sketched the code that does
 iterative updates with RDD (without global update yet). Surprisingly, each
 iteration takes more time than previous as shown below (time in seconds).
 Could you suggest what is the reason for that? I’ve checked GC, it does
 something within few milliseconds.

 Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel
 Xeon 2.2, 16GB RAM each
 Iteration 0 time:1.127990986
 Iteration 1 time:1.391120414
 Iteration 2 time:1.642969138102
 Iteration 3 time:1.9344402954
 Iteration 4 time:2.207529424697
 Iteration 5 time:2.6328659593
 Iteration 6 time:2.791169049296
 Iteration 7 time:3.0850374104
 Iteration 8 time:3.4031050061
 Iteration 9 time:3.8826580919

 Code:
 val modelSize = 10
 val numIterations = 10
 val parallelizm = 5
 var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x = 0.1)
 var newRDD = sc.parallelize(1 to 1, parallelizm).map(x = 0.1)
 var i = 0
 while (i  numIterations) {
   val t = System.nanoTime()
   // updating the weights
   val newRDD = oldRDD.map(x = x * x)
   oldRDD.unpersist(true)
   // “checking” convergence
   newRDD.mean
   println(Iteration  + i +  time: + (System.nanoTime() - t) / 1e9 /
 numIterations)
   oldRDD = newRDD
   i += 1
 }


 Best regards, Alexander