[Performance] Possible regression in rdd.take()?

2015-02-18 Thread Matt Cheah
Hi everyone,

Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
consistently has a slower execution time on the later release. I was
wondering if anyone else has had similar observations.

I have two setups where this reproduces. The first is a local test. I
launched a spark cluster with 4 worker JVMs on my Mac, and launched a
Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on
it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8
files, which ends up having 128 partitions, and a total of 8000 rows.
The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all
numbers being in seconds:
1 items

Spark 1.0.2: 0.069281, 0.012261, 0.011083

Spark 1.1.1: 0.11577, 0.097636, 0.11321



4 items

Spark 1.0.2: 0.023751, 0.069365, 0.023603

Spark 1.1.1: 0.224287, 0.229651, 0.158431



10 items

Spark 1.0.2: 0.047019, 0.049056, 0.042568

Spark 1.1.1: 0.353277, 0.288965, 0.281751



40 items

Spark 1.0.2: 0.216048, 0.198049, 0.796037

Spark 1.1.1: 1.865622, 2.224424, 2.037672

This small test suite indicates a consistently reproducible performance
regression.



I also notice this on a larger scale test. The cluster used is on EC2:

ec2 instance type: m2.4xlarge
10 slaves, 1 master
ephemeral storage
70 cores, 50 GB/box
In this case, I have a 100GB dataset split into 78 files totally 350 million
items, and I take the first 50,000 items from the RDD. In this case, I have
tested this on different formats of the raw data.

With plaintext files:

Spark 1.0.2: 0.422s, 0.363s, 0.382s

Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s



With snappy-compressed Avro files:

Spark 1.0.2: 0.73s, 0.395s, 0.426s

Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s

Again demonstrating a reproducible performance regression.

I was wondering if anyone else observed this regression, and if so, if
anyone would have any idea what could possibly have caused it between Spark
1.0.2 and Spark 1.1.1?

Thanks,

-Matt Cheah




smime.p7s
Description: S/MIME cryptographic signature


Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Patrick Wendell
I believe the heuristic governing the way that take() decides to fetch
partitions changed between these versions. It could be that in certain
cases the new heuristic is worse, but it might be good to just look at
the source code and see, for your number of elements taken and number
of partitions, if there was any effective change in how aggressively
spark fetched partitions.

This was quite a while ago, but I think the change was made because in
many cases the newer code works more efficiently.

- Patrick

On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote:
 Hi everyone,

 Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
 consistently has a slower execution time on the later release. I was
 wondering if anyone else has had similar observations.

 I have two setups where this reproduces. The first is a local test. I
 launched a spark cluster with 4 worker JVMs on my Mac, and launched a
 Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on
 it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8
 files, which ends up having 128 partitions, and a total of 8000 rows.
 The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all
 numbers being in seconds:

 1 items

 Spark 1.0.2: 0.069281, 0.012261, 0.011083

 Spark 1.1.1: 0.11577, 0.097636, 0.11321


 4 items

 Spark 1.0.2: 0.023751, 0.069365, 0.023603

 Spark 1.1.1: 0.224287, 0.229651, 0.158431


 10 items

 Spark 1.0.2: 0.047019, 0.049056, 0.042568

 Spark 1.1.1: 0.353277, 0.288965, 0.281751


 40 items

 Spark 1.0.2: 0.216048, 0.198049, 0.796037

 Spark 1.1.1: 1.865622, 2.224424, 2.037672

 This small test suite indicates a consistently reproducible performance
 regression.


 I also notice this on a larger scale test. The cluster used is on EC2:

 ec2 instance type: m2.4xlarge
 10 slaves, 1 master
 ephemeral storage
 70 cores, 50 GB/box

 In this case, I have a 100GB dataset split into 78 files totally 350 million
 items, and I take the first 50,000 items from the RDD. In this case, I have
 tested this on different formats of the raw data.

 With plaintext files:

 Spark 1.0.2: 0.422s, 0.363s, 0.382s

 Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s


 With snappy-compressed Avro files:

 Spark 1.0.2: 0.73s, 0.395s, 0.426s

 Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s

 Again demonstrating a reproducible performance regression.

 I was wondering if anyone else observed this regression, and if so, if
 anyone would have any idea what could possibly have caused it between Spark
 1.0.2 and Spark 1.1.1?

 Thanks,

 -Matt Cheah

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Matt Cheah
I actually tested Spark 1.2.0 with the code in the rdd.take() method
swapped out for what was in Spark 1.0.2. The run time was still slower,
which indicates to me something at work lower in the stack.

-Matt Cheah

On 2/18/15, 4:54 PM, Patrick Wendell pwend...@gmail.com wrote:

I believe the heuristic governing the way that take() decides to fetch
partitions changed between these versions. It could be that in certain
cases the new heuristic is worse, but it might be good to just look at
the source code and see, for your number of elements taken and number
of partitions, if there was any effective change in how aggressively
spark fetched partitions.

This was quite a while ago, but I think the change was made because in
many cases the newer code works more efficiently.

- Patrick

On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote:
 Hi everyone,

 Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
 consistently has a slower execution time on the later release. I was
 wondering if anyone else has had similar observations.

 I have two setups where this reproduces. The first is a local test. I
 launched a spark cluster with 4 worker JVMs on my Mac, and launched a
 Spark-Shell. I retrieved the text file and immediately called
rdd.take(N) on
 it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
8
 files, which ends up having 128 partitions, and a total of 8000
rows.
 The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
all
 numbers being in seconds:

 1 items

 Spark 1.0.2: 0.069281, 0.012261, 0.011083

 Spark 1.1.1: 0.11577, 0.097636, 0.11321


 4 items

 Spark 1.0.2: 0.023751, 0.069365, 0.023603

 Spark 1.1.1: 0.224287, 0.229651, 0.158431


 10 items

 Spark 1.0.2: 0.047019, 0.049056, 0.042568

 Spark 1.1.1: 0.353277, 0.288965, 0.281751


 40 items

 Spark 1.0.2: 0.216048, 0.198049, 0.796037

 Spark 1.1.1: 1.865622, 2.224424, 2.037672

 This small test suite indicates a consistently reproducible performance
 regression.


 I also notice this on a larger scale test. The cluster used is on EC2:

 ec2 instance type: m2.4xlarge
 10 slaves, 1 master
 ephemeral storage
 70 cores, 50 GB/box

 In this case, I have a 100GB dataset split into 78 files totally 350
million
 items, and I take the first 50,000 items from the RDD. In this case, I
have
 tested this on different formats of the raw data.

 With plaintext files:

 Spark 1.0.2: 0.422s, 0.363s, 0.382s

 Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s


 With snappy-compressed Avro files:

 Spark 1.0.2: 0.73s, 0.395s, 0.426s

 Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s

 Again demonstrating a reproducible performance regression.

 I was wondering if anyone else observed this regression, and if so, if
 anyone would have any idea what could possibly have caused it between
Spark
 1.0.2 and Spark 1.1.1?

 Thanks,

 -Matt Cheah


smime.p7s
Description: S/MIME cryptographic signature


Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Matt Cheah
Ah okay, I turned on spark.localExecution.enabled and the performance
returned to what Spark 1.0.2 had. However I can see how users can
inadvertently incur memory and network strain in fetching the whole
partition to the driver.

I¹ll evaluate on my side if we want to turn this on or not. Thanks for the
quick and accurate response!

-Matt CHeah

From:  Aaron Davidson ilike...@gmail.com
Date:  Wednesday, February 18, 2015 at 5:25 PM
To:  Matt Cheah mch...@palantir.com
Cc:  Patrick Wendell pwend...@gmail.com, dev@spark.apache.org
dev@spark.apache.org, Mingyu Kim m...@palantir.com, Sandor Van
Wassenhove sand...@palantir.com
Subject:  Re: [Performance] Possible regression in rdd.take()?

You might be seeing the result of this patch:

https://github.com/apache/spark/commit/d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5
797

which was introduced in 1.1.1. This patch disabled the ability for take() to
run without launching a Spark job, which means that the latency is
significantly increased for small jobs (but not for large ones). You can try
enabling local execution and seeing if your problem goes away.

On Wed, Feb 18, 2015 at 5:10 PM, Matt Cheah mch...@palantir.com wrote:
 I actually tested Spark 1.2.0 with the code in the rdd.take() method
 swapped out for what was in Spark 1.0.2. The run time was still slower,
 which indicates to me something at work lower in the stack.
 
 -Matt Cheah
 
 On 2/18/15, 4:54 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 I believe the heuristic governing the way that take() decides to fetch
 partitions changed between these versions. It could be that in certain
 cases the new heuristic is worse, but it might be good to just look at
 the source code and see, for your number of elements taken and number
 of partitions, if there was any effective change in how aggressively
 spark fetched partitions.
 
 This was quite a while ago, but I think the change was made because in
 many cases the newer code works more efficiently.
 
 - Patrick
 
 On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote:
  Hi everyone,
 
  Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
  consistently has a slower execution time on the later release. I was
  wondering if anyone else has had similar observations.
 
  I have two setups where this reproduces. The first is a local test. I
  launched a spark cluster with 4 worker JVMs on my Mac, and launched a
  Spark-Shell. I retrieved the text file and immediately called
 rdd.take(N) on
  it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
 8
  files, which ends up having 128 partitions, and a total of 8000
 rows.
  The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
 all
  numbers being in seconds:
 
  1 items
 
  Spark 1.0.2: 0.069281, 0.012261, 0.011083
 
  Spark 1.1.1: 0.11577, 0.097636, 0.11321
 
 
  4 items
 
  Spark 1.0.2: 0.023751, 0.069365, 0.023603
 
  Spark 1.1.1: 0.224287, 0.229651, 0.158431
 
 
  10 items
 
  Spark 1.0.2: 0.047019, 0.049056, 0.042568
 
  Spark 1.1.1: 0.353277, 0.288965, 0.281751
 
 
  40 items
 
  Spark 1.0.2: 0.216048, 0.198049, 0.796037
 
  Spark 1.1.1: 1.865622, 2.224424, 2.037672
 
  This small test suite indicates a consistently reproducible performance
  regression.
 
 
  I also notice this on a larger scale test. The cluster used is on EC2:
 
  ec2 instance type: m2.4xlarge
  10 slaves, 1 master
  ephemeral storage
  70 cores, 50 GB/box
 
  In this case, I have a 100GB dataset split into 78 files totally 350
 million
  items, and I take the first 50,000 items from the RDD. In this case, I
 have
  tested this on different formats of the raw data.
 
  With plaintext files:
 
  Spark 1.0.2: 0.422s, 0.363s, 0.382s
 
  Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
 
 
  With snappy-compressed Avro files:
 
  Spark 1.0.2: 0.73s, 0.395s, 0.426s
 
  Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
 
  Again demonstrating a reproducible performance regression.
 
  I was wondering if anyone else observed this regression, and if so, if
  anyone would have any idea what could possibly have caused it between
 Spark
  1.0.2 and Spark 1.1.1?
 
  Thanks,
 
  -Matt Cheah





smime.p7s
Description: S/MIME cryptographic signature


Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Aaron Davidson
You might be seeing the result of this patch:

https://github.com/apache/spark/commit/d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5797

which was introduced in 1.1.1. This patch disabled the ability for take()
to run without launching a Spark job, which means that the latency is
significantly increased for small jobs (but not for large ones). You can
try enabling local execution and seeing if your problem goes away.

On Wed, Feb 18, 2015 at 5:10 PM, Matt Cheah mch...@palantir.com wrote:

 I actually tested Spark 1.2.0 with the code in the rdd.take() method
 swapped out for what was in Spark 1.0.2. The run time was still slower,
 which indicates to me something at work lower in the stack.

 -Matt Cheah

 On 2/18/15, 4:54 PM, Patrick Wendell pwend...@gmail.com wrote:

 I believe the heuristic governing the way that take() decides to fetch
 partitions changed between these versions. It could be that in certain
 cases the new heuristic is worse, but it might be good to just look at
 the source code and see, for your number of elements taken and number
 of partitions, if there was any effective change in how aggressively
 spark fetched partitions.
 
 This was quite a while ago, but I think the change was made because in
 many cases the newer code works more efficiently.
 
 - Patrick
 
 On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote:
  Hi everyone,
 
  Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
  consistently has a slower execution time on the later release. I was
  wondering if anyone else has had similar observations.
 
  I have two setups where this reproduces. The first is a local test. I
  launched a spark cluster with 4 worker JVMs on my Mac, and launched a
  Spark-Shell. I retrieved the text file and immediately called
 rdd.take(N) on
  it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
 8
  files, which ends up having 128 partitions, and a total of 8000
 rows.
  The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
 all
  numbers being in seconds:
 
  1 items
 
  Spark 1.0.2: 0.069281, 0.012261, 0.011083
 
  Spark 1.1.1: 0.11577, 0.097636, 0.11321
 
 
  4 items
 
  Spark 1.0.2: 0.023751, 0.069365, 0.023603
 
  Spark 1.1.1: 0.224287, 0.229651, 0.158431
 
 
  10 items
 
  Spark 1.0.2: 0.047019, 0.049056, 0.042568
 
  Spark 1.1.1: 0.353277, 0.288965, 0.281751
 
 
  40 items
 
  Spark 1.0.2: 0.216048, 0.198049, 0.796037
 
  Spark 1.1.1: 1.865622, 2.224424, 2.037672
 
  This small test suite indicates a consistently reproducible performance
  regression.
 
 
  I also notice this on a larger scale test. The cluster used is on EC2:
 
  ec2 instance type: m2.4xlarge
  10 slaves, 1 master
  ephemeral storage
  70 cores, 50 GB/box
 
  In this case, I have a 100GB dataset split into 78 files totally 350
 million
  items, and I take the first 50,000 items from the RDD. In this case, I
 have
  tested this on different formats of the raw data.
 
  With plaintext files:
 
  Spark 1.0.2: 0.422s, 0.363s, 0.382s
 
  Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
 
 
  With snappy-compressed Avro files:
 
  Spark 1.0.2: 0.73s, 0.395s, 0.426s
 
  Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
 
  Again demonstrating a reproducible performance regression.
 
  I was wondering if anyone else observed this regression, and if so, if
  anyone would have any idea what could possibly have caused it between
 Spark
  1.0.2 and Spark 1.1.1?
 
  Thanks,
 
  -Matt Cheah