Re: MapValues and Shuffle Reads

2015-02-17 Thread Imran Rashid
Hi Darrin,

You are asking for something near  dear to me:
https://issues.apache.org/jira/browse/SPARK-1061

There is a PR attached there as well.  Note that you could do everything in
that PR in your own user code, you don't need to wait for it to get merged,
*except* for the change to HadoopRDD so that it sorts the input partitions.
 (Though of course, you could always just have your implementation of
HadoopRDD as well ...)

you could also vote for the issue  watch it as well to encourage some
progress on it :)

On Tue, Feb 17, 2015 at 2:56 PM, Darin McBeath ddmcbe...@yahoo.com wrote:

 Thanks Imran.

 I think you are probably correct.  I was a bit surprised that there was no
 shuffle read in the initial hash partition step.  I will adjust the code as
 you suggest to prove that is the case.

 I have a slightly different question.  If I save an RDD to S3 (or some 
 equivalent)
 and this RDD was hash partitioned at the time, do I still need to hash
 partition the RDD again when I read it in?  Is there a way that I could
 prevent all of the shuffling (such as providing a hint)?  My parts for the
 RDD will be gzipped so they would not be splittable).  In reality, that's
 what I would really want to do in the first place.

 Thanks again for your insights.

 Darin.

   --
  *From:* Imran Rashid iras...@cloudera.com
 *To:* Darin McBeath ddmcbe...@yahoo.com
 *Cc:* User user@spark.apache.org
 *Sent:* Tuesday, February 17, 2015 3:29 PM
 *Subject:* Re: MapValues and Shuffle Reads

 Hi Darin,

 When you say you see 400GB of shuffle writes from the first code
 snippet, what do you mean?  There is no action in that first set, so it
 won't do anything.  By itself, it won't do any shuffle writing, or anything
 else for that matter.

 Most likely, the .count() on your second code snippet is actually causing
 the execution of some of the first snippet as well.  The .partitionBy will
 result in both shuffle writes and shuffle reads, but they aren't set in
 motion until the .count further down the line.  Its confusing b/c the stage
 boundaries don't line up exactly with your RDD variables here.  
 hsfBaselinePairRDD
 spans 2 stages, and baselinePairRDD actually gets merged into the stage
 above it.

 If you do a hsfBaselinePairRDD.count after your first code snippet, and
 then run the second code snippet afterwards, is it more like what you
 expect?

 Imran



 On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath 
 ddmcbe...@yahoo.com.invalid wrote:

 In the following code, I read in a large sequence file from S3 (1TB)
 spread across 1024 partitions.  When I look at the job/stage summary, I see
 about 400GB of shuffle writes which seems to make sense as I'm doing a hash
 partition on this file.

 // Get the baseline input file
 JavaPairRDDText,Text hsfBaselinePairRDDReadable =
 sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class,
 Text.class, Text.class);

 JavaPairRDDString, String hsfBaselinePairRDD =
 hsfBaselinePairRDDReadable.mapToPair(new
 ConvertFromWritableTypes()).partitionBy(new
 HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 I then execute the following code (with a count to force execution) and
 what I find very strange is that when I look at the job/stage summary, I
 see more than 340GB of shuffle read.  Why would there be any shuffle read
 in this step?  I would expect there to be little (if any) shuffle reads in
 this step.

 // Use 'substring' to extract the epoch value from each record.
 JavaPairRDDString, Long baselinePairRDD =
 hsfBaselinePairRDD.mapValues(new
 ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 log.info(Number of baseline records:  + baselinePairRDD.count());

 Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.

 Any insights would be appreciated.

 I'm using Spark 1.2.0 in a stand-alone cluster.


 Darin.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: MapValues and Shuffle Reads

2015-02-17 Thread Imran Rashid
Hi Darin,

When you say you see 400GB of shuffle writes from the first code snippet,
what do you mean?  There is no action in that first set, so it won't do
anything.  By itself, it won't do any shuffle writing, or anything else for
that matter.

Most likely, the .count() on your second code snippet is actually causing
the execution of some of the first snippet as well.  The .partitionBy will
result in both shuffle writes and shuffle reads, but they aren't set in
motion until the .count further down the line.  Its confusing b/c the stage
boundaries don't line up exactly with your RDD variables here.
hsfBaselinePairRDD
spans 2 stages, and baselinePairRDD actually gets merged into the stage
above it.

If you do a hsfBaselinePairRDD.count after your first code snippet, and
then run the second code snippet afterwards, is it more like what you
expect?

Imran

On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid
wrote:

 In the following code, I read in a large sequence file from S3 (1TB)
 spread across 1024 partitions.  When I look at the job/stage summary, I see
 about 400GB of shuffle writes which seems to make sense as I'm doing a hash
 partition on this file.

 // Get the baseline input file
 JavaPairRDDText,Text hsfBaselinePairRDDReadable =
 sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class,
 Text.class, Text.class);

 JavaPairRDDString, String hsfBaselinePairRDD =
 hsfBaselinePairRDDReadable.mapToPair(new
 ConvertFromWritableTypes()).partitionBy(new
 HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 I then execute the following code (with a count to force execution) and
 what I find very strange is that when I look at the job/stage summary, I
 see more than 340GB of shuffle read.  Why would there be any shuffle read
 in this step?  I would expect there to be little (if any) shuffle reads in
 this step.

 // Use 'substring' to extract the epoch value from each record.
 JavaPairRDDString, Long baselinePairRDD =
 hsfBaselinePairRDD.mapValues(new
 ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 log.info(Number of baseline records:  + baselinePairRDD.count());

 Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.

 Any insights would be appreciated.

 I'm using Spark 1.2.0 in a stand-alone cluster.


 Darin.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: MapValues and Shuffle Reads

2015-02-17 Thread Darin McBeath
Thanks Imran.

I think you are probably correct. I was a bit surprised that there was no 
shuffle read in the initial hash partition step. I will adjust the code as you 
suggest to prove that is the case. 

I have a slightly different question. If I save an RDD to S3 (or some 
equivalent) and this RDD was hash partitioned at the time, do I still need to 
hash partition the RDD again when I read it in? Is there a way that I could 
prevent all of the shuffling (such as providing a hint)? My parts for the RDD 
will be gzipped so they would not be splittable).  In reality, that's what I 
would really want to do in the first place.

Thanks again for your insights.

Darin.




From: Imran Rashid iras...@cloudera.com
To: Darin McBeath ddmcbe...@yahoo.com 
Cc: User user@spark.apache.org 
Sent: Tuesday, February 17, 2015 3:29 PM
Subject: Re: MapValues and Shuffle Reads



Hi Darin,

When you say you see 400GB of shuffle writes from the first code snippet, 
what do you mean?  There is no action in that first set, so it won't do 
anything.  By itself, it won't do any shuffle writing, or anything else for 
that matter.

Most likely, the .count() on your second code snippet is actually causing the 
execution of some of the first snippet as well.  The .partitionBy will result 
in both shuffle writes and shuffle reads, but they aren't set in motion until 
the .count further down the line.  Its confusing b/c the stage boundaries don't 
line up exactly with your RDD variables here.  hsfBaselinePairRDD spans 2 
stages, and baselinePairRDD actually gets merged into the stage above it.

If you do a hsfBaselinePairRDD.count after your first code snippet, and then 
run the second code snippet afterwards, is it more like what you expect?

Imran




On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid 
wrote:

In the following code, I read in a large sequence file from S3 (1TB) spread 
across 1024 partitions.  When I look at the job/stage summary, I see about 
400GB of shuffle writes which seems to make sense as I'm doing a hash partition 
on this file.

// Get the baseline input file
JavaPairRDDText,Text hsfBaselinePairRDDReadable = 
sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, 
Text.class, Text.class);

JavaPairRDDString, String hsfBaselinePairRDD = 
hsfBaselinePairRDDReadable.mapToPair(new 
ConvertFromWritableTypes()).partitionBy(new 
HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());

I then execute the following code (with a count to force execution) and what I 
find very strange is that when I look at the job/stage summary, I see more 
than 340GB of shuffle read.  Why would there be any shuffle read in this step? 
 I would expect there to be little (if any) shuffle reads in this step.

// Use 'substring' to extract the epoch value from each record.
JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new 
ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());

log.info(Number of baseline records:  + baselinePairRDD.count());

Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.

Any insights would be appreciated.

I'm using Spark 1.2.0 in a stand-alone cluster.


Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org