Thanks Yong,

my script is pretty straight forward -

*sc.textFile("/wc/input").flatMap(line => line.split(" ")).map(word =>
(word,1)).reduceByKey(_+_).saveAsTextFile("/wc/out2") *//both paths are
HDFS.

so if for every shuffle write , it always writes to disk , what is the
meaning of these properties -

spark.shuffle.memoryFraction
spark.shuffle.spill

Thanks,
Kartik




On Fri, Oct 2, 2015 at 6:22 AM, java8964 <java8...@hotmail.com> wrote:

> No problem.
>
> From the mapper side, Spark is very similar as the MapReduce; but on the
> reducer fetching side, MR uses sort merge vs Spark uses HashMap.
>
> So keep this in mind that you can get data automatically sorted on the
> reducer side on MR, but not in Spark.
>
> Spark's performance comes:
>
>    - Cache ability and smart arranging the tasks into stages.
>    - Intermediate data between stages never stored in HDFS, but in local
>    disk. In MR jobs, from one MR job to another one, the intermediate data
>    stored in HDFS.
>    - Spark uses threads to run tasks, instead of heavy process as MR.
>
>
> Without caching, in my experience, Spark can get about 2x to 5x better
> than MR job, depending on the jog logic. If the data volume is small, Spark
> will be even better, as the processor is way more expensive than the thread
> in this case.
>
> I didn't see your Spark script, so my guess is that you are using
> "rdd.collect()", which will transfer the final result to driver and dump it
> in the console.
>
> Yong
>
> ------------------------------
> Date: Fri, 2 Oct 2015 00:50:24 -0700
> Subject: Re: Problem understanding spark word count execution
> From: kar...@bluedata.com
> To: java8...@hotmail.com
> CC: nicolae.maras...@adswizz.com; user@spark.apache.org
>
>
> Thanks Yong ,
>
> That was a good explanation I was looking for , however I have one doubt ,
> you write - *"**Image that you have 2 mappers to read the data, then each
> mapper will generate the (word, count) tuple output in segments. Spark
> always output that in local file. (In fact, one file with different
> segments to represent different partitions) "  *if this is true then
> spark is very similar to Hadoop MapReduce (Disk IO bw phases) , with so
> many IOs after each stage how does spark achieves the performance that it
> does as compared to map reduce . Another doubt is  *"*The 2000 bytes sent
> to driver is the final output aggregated on the reducers end, and merged
> back to the driver."* , *which part of our word count code takes care of
> this part ? And yes there are only 273 distinct words in the text so that's
> not a surprise.
>
> Thanks again,
>
> Hope to get a reply.
>
> --Kartik
>
> On Thu, Oct 1, 2015 at 5:49 PM, java8964 <java8...@hotmail.com> wrote:
>
> I am not sure about originally explain of shuffle write.
>
> In the word count example, the shuffle is needed, as Spark has to group by
> the word (ReduceBy is more accurate here). Image that you have 2 mappers to
> read the data, then each mapper will generate the (word, count) tuple
> output in segments. Spark always output that in local file. (In fact, one
> file with different segments to represent different partitions).
>
> As you can image, the output of these segments will be small, as it only
> contains (word, count of word) tuples. After each mapper generates this
> segmented file for different partitions, then the reduce will fetch the
> partitions belonging to itself.
>
> In your job summery, if your source is text file, so your data corresponds
> to 2 HDFS block, or 2x256M. There are 2 tasks concurrent read these 2
> partitions, about 2.5M lines of data of each partition being processed.
>
> The output of each partition is shuffle-writing 2.7K data, which is the
> size of the segment file generated, corresponding to all the unique words
> and their count of this partition. So the size is reasonable, at least for
> me.
>
> The interested number is 273 as shuffle write records. I am not 100% sure
> its meaning. Does it mean that this partition have 273 unique words from
> these 2.5M lines of data? That is kind of low, but I really don't have
> other explaining of its meaning.
>
> If you finally output shows hundreds of unique words, then it is.
>
> The 2000 bytes sent to driver is the final output aggregated on the
> reducers end, and merged back to the driver.
>
> Yong
>
>
> ------------------------------
> Date: Thu, 1 Oct 2015 13:33:59 -0700
> Subject: Re: Problem understanding spark word count execution
> From: kar...@bluedata.com
> To: nicolae.maras...@adswizz.com
> CC: user@spark.apache.org
>
>
> Hi Nicolae,
> Thanks for the reply. To further clarify things -
>
> sc.textFile is reading from HDFS, now shouldn't the file be read in a way
> such that EACH executer works on only the local copy of file part available
> , in this case its a ~ 4.64 GB file and block size is 256MB, so approx 19
> partitions will be created and each task will run on  1 partition (which is
> what I am seeing in the stages logs) , also i assume it will read the file
> in a way that each executer will have exactly same amount of data. so there
> shouldn't be any shuffling in reading atleast.
>
> During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is
> the output I am seeing
>
> IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
> TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors0440
> SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 s256.0 MB
> (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 10.35.244.112015/09/29
> 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 273
> I have following questions -
>
> 1) What exactly is 2.7KB of shuffle write  ?
> 2) is this 2.7 KB of shuffle write is local to that executer ?
> 3) In the executers log I am seeing 2000 bytes results sent to the driver
> , if instead this number is much much greater than 2000 byes such that it
> does not fit in executer's memory , will shuffle write increase ?
> 4)For word count 256 MB data is substantial amount text , how come the
> result for this stage is only 2000 bytes !! it should send everyword with
> respective count , for a 256 MB input this result should be much bigger ?
>
> I hope I am clear this time.
>
> Hope to get a reply,
>
> Thanks
> Kartik
>
>
>
> On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
> Hi,
>
> So you say " *sc.textFile -> flatMap -> Map".*
>
> *My understanding is like this:*
> *First step is a number of partitions are determined, p of them. You can
> give hint on this.*
> *Then the nodes which will load partitions p, that is n nodes (where
> n<=p).*
>
> *Relatively at the same time or not, the n nodes start opening different
> sections of the file - the physical equivalent of the partitions: for
> instance in HDFS they would do an open and a seek I guess and just read
> from the stream there, convert to whatever the InputFormat dictates.*
>
> The shuffle can only be the part when a node opens an HDFS file for
> instance but the node does not have a local replica of the blocks which it
> needs to read (those pertaining to his assigned partitions). So he needs to
> pick them up from remote nodes which do have replicas of that data.
>
> After blocks are read into memory, flatMap and Map are local computations
> generating new RDDs and in the end the result is sent to the driver
> (whatever termination computation does on the RDD like the result of
> reduce, or side effects of rdd.foreach, etc).
>
> Maybe you can share more of your context if still unclear.
> I just made assumptions to give clarity on a similar thing.
>
> Nicu
> ------------------------------
> *From:* Kartik Mathur <kar...@bluedata.com>
> *Sent:* Thursday, October 1, 2015 10:25 PM
> *To:* Nicolae Marasoiu
> *Cc:* user
> *Subject:* Re: Problem understanding spark word count execution
>
> Thanks Nicolae ,
> So In my case all executers are sending results back to the driver and and
> "*shuffle* *is just sending out the textFile to distribute the
> partitions", *could you please elaborate on this  ? what exactly is in
> this file ?
>
> On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
>
>
> Hi,
>
> 2- the end results are sent back to the driver; the shuffles are
> transmission of intermediate results between nodes such as the -> which are
> all intermediate transformations.
>
> More precisely, since flatMap and map are narrow dependencies, meaning
> they can usually happen on the local node, I bet shuffle is just sending
> out the textFile to a few nodes to distribute the partitions.
>
>
> ------------------------------
> *From:* Kartik Mathur <kar...@bluedata.com>
> *Sent:* Thursday, October 1, 2015 12:42 AM
> *To:* user
> *Subject:* Problem understanding spark word count execution
>
> Hi All,
>
> I tried running spark word count and I have couple of questions -
>
> I am analyzing stage 0 , i.e
>  *sc.textFile -> flatMap -> Map (Word count example)*
>
> 1) In the *Stage logs* under Application UI details for every task I am
> seeing Shuffle write as 2.7 KB, *question - how can I know where all did
> this task write ? like how many bytes to which executer ?*
>
> 2) In the executer's log when I look for same task it says 2000 bytes of
> result is sent to driver , my question is , *if the results were directly
> sent to driver what is this shuffle write ? *
>
> Thanks,
> Kartik
>
>
>
>
>

Reply via email to