Re: Iteration question

2014-07-15 Thread Matei Zaharia
Hi Nathan,

I think there are two possible reasons for this. One is that even though you 
are caching RDDs, their lineage chain gets longer and longer, and thus 
serializing each RDD takes more time. You can cut off the chain by using 
RDD.checkpoint() periodically, say every 5-10 iterations. The second reason may 
just be garbage accumulating in the JVM and causing more collection time as you 
go ahead.

Matei

On Jul 11, 2014, at 6:54 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com 
wrote:

 Hi, folks.
 
 We're having a problem with iteration that I don't understand.
 
 We have the following test code:
 
 org.apache.log4j.Logger.getLogger(org).setLevel(org.apache.log4j.Level.WARN)
 org.apache.log4j.Logger.getLogger(akka).setLevel(org.apache.log4j.Level.WARN)
 
 def test (caching: Boolean, points: Int, iterations: Int) {
   var coords = sc.parallelize(Array.fill(points)(0.0, 
 0.0).zipWithIndex.map(_.swap))
   if (caching) coords.cache
   coords.count
 
   var iteration = 0
   val times = new Array[Double](iterations)
 
   do {
   val start = System.currentTimeMillis
   val thisIteration = iteration
   val increments = sc.parallelize(for (i - 1 to points) yield 
 (math.random, math.random))
   val newcoords = coords.zip(increments).map(p =
   {
   if (0 == p._1._1) println(Processing iteration 
 +thisIteration)
   (p._1._1,
(p._1._2._1 + p._2._1,
 p._1._2._2 + p._2._2))
   }
   )
   if (caching) newcoords.cache
   newcoords.count
   if (caching) coords.unpersist(false)
   coords = newcoords
   val end = System.currentTimeMillis
 
   times(iteration) = (end-start)/1000.0
   println(Done iteration +iteration+ in +times(iteration)+ 
 seconds)
   iteration = iteration + 1
   } while (iteration  iterations)
 
   for (i - 0 until iterations) {
   println(Iteration +i+: +times(i))
   }
 }
 
 If you run this on a local server with caching on and off, it appears that 
 the caching does what it is supposed to do - only the latest iteration is 
 processed each time through the loop.
 
 However, despite this, the time for each iteration still gets slower and 
 slower.
 For example, calling test(true, 5000, 100), I get the following times 
 (weeding out a few for brevity):
 Iteration 0: 0.084
 Iteration 10: 0.381
 Iteration 20: 0.674
 Iteration 30: 0.975
 Iteration 40: 1.254
 Iteration 50: 1.544
 Iteration 60: 1.802
 Iteration 70: 2.147
 Iteration 80: 2.469
 Iteration 90: 2.715
 Iteration 99: 2.962
 
 That's a 35x increase between the first and last iteration, when it should be 
 doing the same thing each time!
 
 Without caching, the nubmers are
 Iteration 0: 0.642
 Iteration 10: 0.516
 Iteration 20: 0.823
 Iteration 30: 1.17
 Iteration 40: 1.514
 Iteration 50: 1.655
 Iteration 60: 1.992
 Iteration 70: 2.177
 Iteration 80: 2.472
 Iteration 90: 2.814
 Iteration 99: 3.018
 
 slightly slower - but not significantly.
 
 Does anyone know, if the caching is working, why is iteration 100 slower than 
 iteration 1?  And why is caching making so little difference?
 
 
 Thanks,
 -Nathan Kronenfeld
 
 -- 
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com



Iteration question

2014-07-11 Thread Nathan Kronenfeld
Hi, folks.

We're having a problem with iteration that I don't understand.

We have the following test code:

org.apache.log4j.Logger.getLogger(org).setLevel(org.apache.log4j.Level.WARN)
org.apache.log4j.Logger.getLogger(akka).setLevel(org.apache.log4j.Level.WARN)

def test (caching: Boolean, points: Int, iterations: Int) {
var coords = sc.parallelize(Array.fill(points)(0.0,
0.0).zipWithIndex.map(_.swap))
if (caching) coords.cache
coords.count

var iteration = 0
val times = new Array[Double](iterations)

do {
val start = System.currentTimeMillis
val thisIteration = iteration
val increments = sc.parallelize(for (i - 1 to points) yield (math.random,
math.random))
val newcoords = coords.zip(increments).map(p =
{
if (0 == p._1._1) println(Processing iteration +thisIteration)
(p._1._1,
 (p._1._2._1 + p._2._1,
  p._1._2._2 + p._2._2))
}
)
if (caching) newcoords.cache
newcoords.count
if (caching) coords.unpersist(false)
coords = newcoords
val end = System.currentTimeMillis

times(iteration) = (end-start)/1000.0
println(Done iteration +iteration+ in +times(iteration)+ seconds)
iteration = iteration + 1
} while (iteration  iterations)

for (i - 0 until iterations) {
println(Iteration +i+: +times(i))
}
}

If you run this on a local server with caching on and off, it appears that
the caching does what it is supposed to do - only the latest iteration is
processed each time through the loop.

However, despite this, the time for each iteration still gets slower and
slower.
For example, calling test(true, 5000, 100), I get the following times
(weeding out a few for brevity):
Iteration 0: 0.084
Iteration 10: 0.381
Iteration 20: 0.674
Iteration 30: 0.975
Iteration 40: 1.254
Iteration 50: 1.544
Iteration 60: 1.802
Iteration 70: 2.147
Iteration 80: 2.469
Iteration 90: 2.715
Iteration 99: 2.962

That's a 35x increase between the first and last iteration, when it should
be doing the same thing each time!

Without caching, the nubmers are
Iteration 0: 0.642
Iteration 10: 0.516
Iteration 20: 0.823
Iteration 30: 1.17
Iteration 40: 1.514
Iteration 50: 1.655
Iteration 60: 1.992
Iteration 70: 2.177
Iteration 80: 2.472
Iteration 90: 2.814
Iteration 99: 3.018

slightly slower - but not significantly.

Does anyone know, if the caching is working, why is iteration 100 slower
than iteration 1?  And why is caching making so little difference?


Thanks,
-Nathan Kronenfeld

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com