Hi Darin,

When you say you "see 400GB of shuffle writes" from the first code snippet,
what do you mean?  There is no action in that first set, so it won't do
anything.  By itself, it won't do any shuffle writing, or anything else for
that matter.

Most likely, the .count() on your second code snippet is actually causing
the execution of some of the first snippet as well.  The .partitionBy will
result in both shuffle writes and shuffle reads, but they aren't set in
motion until the .count further down the line.  Its confusing b/c the stage
boundaries don't line up exactly with your RDD variables here.
hsfBaselinePairRDD
"spans" 2 stages, and baselinePairRDD actually gets merged into the stage
above it.

If you do a hsfBaselinePairRDD.count after your first code snippet, and
then run the second code snippet afterwards, is it more like what you
expect?

Imran

On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath <ddmcbe...@yahoo.com.invalid>
wrote:

> In the following code, I read in a large sequence file from S3 (1TB)
> spread across 1024 partitions.  When I look at the job/stage summary, I see
> about 400GB of shuffle writes which seems to make sense as I'm doing a hash
> partition on this file.
>
> // Get the baseline input file
> JavaPairRDD<Text,Text> hsfBaselinePairRDDReadable =
> sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class,
> Text.class, Text.class);
>
> JavaPairRDD<String, String> hsfBaselinePairRDD =
> hsfBaselinePairRDDReadable.mapToPair(new
> ConvertFromWritableTypes()).partitionBy(new
> HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());
>
> I then execute the following code (with a count to force execution) and
> what I find very strange is that when I look at the job/stage summary, I
> see more than 340GB of shuffle read.  Why would there be any shuffle read
> in this step?  I would expect there to be little (if any) shuffle reads in
> this step.
>
> // Use 'substring' to extract the epoch value from each record.
> JavaPairRDD<String, Long> baselinePairRDD =
> hsfBaselinePairRDD.mapValues(new
> ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());
>
> log.info("Number of baseline records: " + baselinePairRDD.count());
>
> Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.
>
> Any insights would be appreciated.
>
> I'm using Spark 1.2.0 in a stand-alone cluster.
>
>
> Darin.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to