Thanks Yong, my script is pretty straight forward -
*sc.textFile("/wc/input").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_).saveAsTextFile("/wc/out2") *//both paths are HDFS. so if for every shuffle write , it always writes to disk , what is the meaning of these properties - spark.shuffle.memoryFraction spark.shuffle.spill Thanks, Kartik On Fri, Oct 2, 2015 at 6:22 AM, java8964 <java8...@hotmail.com> wrote: > No problem. > > From the mapper side, Spark is very similar as the MapReduce; but on the > reducer fetching side, MR uses sort merge vs Spark uses HashMap. > > So keep this in mind that you can get data automatically sorted on the > reducer side on MR, but not in Spark. > > Spark's performance comes: > > - Cache ability and smart arranging the tasks into stages. > - Intermediate data between stages never stored in HDFS, but in local > disk. In MR jobs, from one MR job to another one, the intermediate data > stored in HDFS. > - Spark uses threads to run tasks, instead of heavy process as MR. > > > Without caching, in my experience, Spark can get about 2x to 5x better > than MR job, depending on the jog logic. If the data volume is small, Spark > will be even better, as the processor is way more expensive than the thread > in this case. > > I didn't see your Spark script, so my guess is that you are using > "rdd.collect()", which will transfer the final result to driver and dump it > in the console. > > Yong > > ------------------------------ > Date: Fri, 2 Oct 2015 00:50:24 -0700 > Subject: Re: Problem understanding spark word count execution > From: kar...@bluedata.com > To: java8...@hotmail.com > CC: nicolae.maras...@adswizz.com; user@spark.apache.org > > > Thanks Yong , > > That was a good explanation I was looking for , however I have one doubt , > you write - *"**Image that you have 2 mappers to read the data, then each > mapper will generate the (word, count) tuple output in segments. Spark > always output that in local file. (In fact, one file with different > segments to represent different partitions) " *if this is true then > spark is very similar to Hadoop MapReduce (Disk IO bw phases) , with so > many IOs after each stage how does spark achieves the performance that it > does as compared to map reduce . Another doubt is *"*The 2000 bytes sent > to driver is the final output aggregated on the reducers end, and merged > back to the driver."* , *which part of our word count code takes care of > this part ? And yes there are only 273 distinct words in the text so that's > not a surprise. > > Thanks again, > > Hope to get a reply. > > --Kartik > > On Thu, Oct 1, 2015 at 5:49 PM, java8964 <java8...@hotmail.com> wrote: > > I am not sure about originally explain of shuffle write. > > In the word count example, the shuffle is needed, as Spark has to group by > the word (ReduceBy is more accurate here). Image that you have 2 mappers to > read the data, then each mapper will generate the (word, count) tuple > output in segments. Spark always output that in local file. (In fact, one > file with different segments to represent different partitions). > > As you can image, the output of these segments will be small, as it only > contains (word, count of word) tuples. After each mapper generates this > segmented file for different partitions, then the reduce will fetch the > partitions belonging to itself. > > In your job summery, if your source is text file, so your data corresponds > to 2 HDFS block, or 2x256M. There are 2 tasks concurrent read these 2 > partitions, about 2.5M lines of data of each partition being processed. > > The output of each partition is shuffle-writing 2.7K data, which is the > size of the segment file generated, corresponding to all the unique words > and their count of this partition. So the size is reasonable, at least for > me. > > The interested number is 273 as shuffle write records. I am not 100% sure > its meaning. Does it mean that this partition have 273 unique words from > these 2.5M lines of data? That is kind of low, but I really don't have > other explaining of its meaning. > > If you finally output shows hundreds of unique words, then it is. > > The 2000 bytes sent to driver is the final output aggregated on the > reducers end, and merged back to the driver. > > Yong > > > ------------------------------ > Date: Thu, 1 Oct 2015 13:33:59 -0700 > Subject: Re: Problem understanding spark word count execution > From: kar...@bluedata.com > To: nicolae.maras...@adswizz.com > CC: user@spark.apache.org > > > Hi Nicolae, > Thanks for the reply. To further clarify things - > > sc.textFile is reading from HDFS, now shouldn't the file be read in a way > such that EACH executer works on only the local copy of file part available > , in this case its a ~ 4.64 GB file and block size is 256MB, so approx 19 > partitions will be created and each task will run on 1 partition (which is > what I am seeing in the stages logs) , also i assume it will read the file > in a way that each executer will have exactly same amount of data. so there > shouldn't be any shuffling in reading atleast. > > During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is > the output I am seeing > > IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC > TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors0440 > SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 s256.0 MB > (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 10.35.244.112015/09/29 > 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 273 > I have following questions - > > 1) What exactly is 2.7KB of shuffle write ? > 2) is this 2.7 KB of shuffle write is local to that executer ? > 3) In the executers log I am seeing 2000 bytes results sent to the driver > , if instead this number is much much greater than 2000 byes such that it > does not fit in executer's memory , will shuffle write increase ? > 4)For word count 256 MB data is substantial amount text , how come the > result for this stage is only 2000 bytes !! it should send everyword with > respective count , for a 256 MB input this result should be much bigger ? > > I hope I am clear this time. > > Hope to get a reply, > > Thanks > Kartik > > > > On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu < > nicolae.maras...@adswizz.com> wrote: > > Hi, > > So you say " *sc.textFile -> flatMap -> Map".* > > *My understanding is like this:* > *First step is a number of partitions are determined, p of them. You can > give hint on this.* > *Then the nodes which will load partitions p, that is n nodes (where > n<=p).* > > *Relatively at the same time or not, the n nodes start opening different > sections of the file - the physical equivalent of the partitions: for > instance in HDFS they would do an open and a seek I guess and just read > from the stream there, convert to whatever the InputFormat dictates.* > > The shuffle can only be the part when a node opens an HDFS file for > instance but the node does not have a local replica of the blocks which it > needs to read (those pertaining to his assigned partitions). So he needs to > pick them up from remote nodes which do have replicas of that data. > > After blocks are read into memory, flatMap and Map are local computations > generating new RDDs and in the end the result is sent to the driver > (whatever termination computation does on the RDD like the result of > reduce, or side effects of rdd.foreach, etc). > > Maybe you can share more of your context if still unclear. > I just made assumptions to give clarity on a similar thing. > > Nicu > ------------------------------ > *From:* Kartik Mathur <kar...@bluedata.com> > *Sent:* Thursday, October 1, 2015 10:25 PM > *To:* Nicolae Marasoiu > *Cc:* user > *Subject:* Re: Problem understanding spark word count execution > > Thanks Nicolae , > So In my case all executers are sending results back to the driver and and > "*shuffle* *is just sending out the textFile to distribute the > partitions", *could you please elaborate on this ? what exactly is in > this file ? > > On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu < > nicolae.maras...@adswizz.com> wrote: > > > > Hi, > > 2- the end results are sent back to the driver; the shuffles are > transmission of intermediate results between nodes such as the -> which are > all intermediate transformations. > > More precisely, since flatMap and map are narrow dependencies, meaning > they can usually happen on the local node, I bet shuffle is just sending > out the textFile to a few nodes to distribute the partitions. > > > ------------------------------ > *From:* Kartik Mathur <kar...@bluedata.com> > *Sent:* Thursday, October 1, 2015 12:42 AM > *To:* user > *Subject:* Problem understanding spark word count execution > > Hi All, > > I tried running spark word count and I have couple of questions - > > I am analyzing stage 0 , i.e > *sc.textFile -> flatMap -> Map (Word count example)* > > 1) In the *Stage logs* under Application UI details for every task I am > seeing Shuffle write as 2.7 KB, *question - how can I know where all did > this task write ? like how many bytes to which executer ?* > > 2) In the executer's log when I look for same task it says 2000 bytes of > result is sent to driver , my question is , *if the results were directly > sent to driver what is this shuffle write ? * > > Thanks, > Kartik > > > > >