Re: Advantage of using cache()

2014-08-23 Thread Patrick Wendell
Yep - that's correct. As an optimization we save the shuffle output and
re-use if if you execute a stage twice. So this can make A:B tests like
this a bit confusing.

- Patrick

On Friday, August 22, 2014, Nieyuan qiushuiwuh...@gmail.com wrote:

 Because map-reduce tasks like join will save shuffle data to disk . So the
 only diffrence with caching or no-caching version is :
.map { case (x, (n, i)) = (x, n)}



 -
 Thanks,
 Nieyuan
 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Advantage-of-using-cache-tp12480p12634.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:;
 For additional commands, e-mail: user-h...@spark.apache.org javascript:;




Re: Advantage of using cache()

2014-08-22 Thread Nieyuan
Because map-reduce tasks like join will save shuffle data to disk . So the
only diffrence with caching or no-caching version is :
   .map { case (x, (n, i)) = (x, n)}



-
Thanks,
Nieyuan
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Advantage-of-using-cache-tp12480p12634.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Advantage of using cache()

2014-08-21 Thread Grzegorz Białek
Hi,
thank you for your response. I removed issues you mentioned. Now I read
RDDs from files, whole rdd is cached, I don't use random and rdd1 and rdd2
are identical.
RDDs that are joined contains 100k entries and result contains 10m entries.
rdd1 and rdd2 after join also contains 10m entries. Here is the code:

  val help = sc.textFile(input/tab1)
.map{ s = val pair = s.split(\t); (pair(0).toInt, pair(1).toInt)}
.repartition(100)
  val rdd = sc.textFile(input/tab2)
.map{ s = val pair = s.split(\t); (pair(0).toInt, pair(1).toInt)}
.repartition(100)
.join(help)
.map { case (x, (n, i)) = (x, n)}
.cache() // or without it

  val rdd1 = sc.parallelize(Array.range(0, 1000)).map(x = (x, x))
.join(rdd).saveAsTextFile(output/1)
  val rdd2 = sc.parallelize(Array.range(0, 1000)).map(x = (x, x))
.join(rdd).saveAsTextFile(output/2)

Files input/tab1, input/tab2 were generated using this python code:
for x in range(10):
file1.write(%d\t%d\n % (random.randint(0, 1000), x))
file2.write(%d\t%d\n % (random.randint(0, 1000), x))

When using cache whole rdd is cached. It's size is 362MB.
Results are similar:
Without cache: stages used to compute rdd: 30s, rdd1: 16s, rdd2: 12s
With cache: stages used to compute rdd: 28s, rdd1: 14s, rdd2: 15s

I thought that without caching rdd, computing rdd2 will be much longer due
to recomputing of rdd. But it seems that it doesn't work that way. Could
you explain it or point to an example which shows power of caching?

Thanks,
Grzegorz


On Wed, Aug 20, 2014 at 11:22 PM, Patrick Wendell pwend...@gmail.com
wrote:

 Your rdd2 and rdd3 differ in two ways so it's hard to track the exact
 effect of caching. In rdd3, in addition to the fact that rdd will be
 cached, you are also doing a bunch of extra random number generation. So it
 will be hard to isolate the effect of caching.


 On Wed, Aug 20, 2014 at 7:48 AM, Grzegorz Białek 
 grzegorz.bia...@codilime.com wrote:

 Hi,

 I tried to write small program which shows that using cache() can speed
 up execution but results with and without cache were similar. Could help me
 with this issue? I tried to compute rdd and use it later in two places and
 I thought in second usage this rdd is recomputed but it doesn't:

   val help = sc.parallelize(Array.range(1, 2)).repartition(100)
 .map(x = (scala.util.Random.nextInt(10), x))
   val rdd = sc.parallelize(Array.range(1,2))
  .repartition(100)
 .map(x = (scala.util.Random.nextInt(10), x))
 .join(help)
 .map { case (x, (n, i)) = (x, n)}
 .reduceByKey(_ + _)
 .cache()

   val rdd2 = sc.parallelize(Array.range(1,1000)).map(x = (x, x))
 .join(rdd).saveAsTextFile(output/1)
   val rdd3 = sc.parallelize(Array.range(1,1000)).map(x =
 (scala.util.Random.nextInt(1000), x))
 .join(rdd).saveAsTextFile(output/2)

 Thanks,
 Grzegorz





Advantage of using cache()

2014-08-20 Thread Grzegorz Białek
Hi,

I tried to write small program which shows that using cache() can speed up
execution but results with and without cache were similar. Could help me
with this issue? I tried to compute rdd and use it later in two places and
I thought in second usage this rdd is recomputed but it doesn't:

  val help = sc.parallelize(Array.range(1, 2)).repartition(100)
.map(x = (scala.util.Random.nextInt(10), x))
  val rdd = sc.parallelize(Array.range(1,2))
.repartition(100)
.map(x = (scala.util.Random.nextInt(10), x))
.join(help)
.map { case (x, (n, i)) = (x, n)}
.reduceByKey(_ + _)
.cache()

  val rdd2 = sc.parallelize(Array.range(1,1000)).map(x = (x, x))
.join(rdd).saveAsTextFile(output/1)
  val rdd3 = sc.parallelize(Array.range(1,1000)).map(x =
(scala.util.Random.nextInt(1000), x))
.join(rdd).saveAsTextFile(output/2)

Thanks,
Grzegorz


Re: Advantage of using cache()

2014-08-20 Thread Patrick Wendell
Your rdd2 and rdd3 differ in two ways so it's hard to track the exact
effect of caching. In rdd3, in addition to the fact that rdd will be
cached, you are also doing a bunch of extra random number generation. So it
will be hard to isolate the effect of caching.


On Wed, Aug 20, 2014 at 7:48 AM, Grzegorz Białek 
grzegorz.bia...@codilime.com wrote:

 Hi,

 I tried to write small program which shows that using cache() can speed up
 execution but results with and without cache were similar. Could help me
 with this issue? I tried to compute rdd and use it later in two places and
 I thought in second usage this rdd is recomputed but it doesn't:

   val help = sc.parallelize(Array.range(1, 2)).repartition(100)
 .map(x = (scala.util.Random.nextInt(10), x))
   val rdd = sc.parallelize(Array.range(1,2))
 .repartition(100)
 .map(x = (scala.util.Random.nextInt(10), x))
 .join(help)
 .map { case (x, (n, i)) = (x, n)}
 .reduceByKey(_ + _)
 .cache()

   val rdd2 = sc.parallelize(Array.range(1,1000)).map(x = (x, x))
 .join(rdd).saveAsTextFile(output/1)
   val rdd3 = sc.parallelize(Array.range(1,1000)).map(x =
 (scala.util.Random.nextInt(1000), x))
 .join(rdd).saveAsTextFile(output/2)

 Thanks,
 Grzegorz