That makes sense. Thanks everyone for the explanations! Mingyu
From: Matei Zaharia <matei.zaha...@gmail.com> Reply-To: "user@spark.apache.org" <user@spark.apache.org> Date: Tuesday, July 15, 2014 at 3:00 PM To: "user@spark.apache.org" <user@spark.apache.org> Subject: Re: How does Spark speculation prevent duplicated work? Yeah, this is handled by the "commit" call of the FileOutputFormat. In general Hadoop OutputFormats have a concept called "committing" the output, which you should do only once per partition. In the file ones it does an atomic rename to make sure that the final output is a complete file. Matei On Jul 15, 2014, at 2:49 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > The way the HDFS file writing works at a high level is that each attempt to > write a partition to a file starts writing to unique temporary file (say, > something like targetDirectory/_temp/part-XXXXX_attempt-YYYY). If the writing > into the file successfully completes, then the temporary file is moved to the > final location (say, targetDirectory/part-XXXXX). If, due to speculative > execution, the file already exists in the final intended location, then move > is avoided. Or, its overwritten, I forget the implementation. Either ways, all > attempts to write the same partition, will always write the same data to the > temp file (assuming the spark transformation generating the data is > deterministic and idempotent). And once one attempt is successful, the final > file will have the same data. Hence, writing to HDFS / S3 is idempotent. > > Now this logic is already implemented within the Hadoop's MapReduce logic, and > Spark just uses it directly. > > TD > > > On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim <m...@palantir.com> wrote: >> Thanks for the explanation, guys. >> >> I looked into the saveAsHadoopFile implementation a little bit. If you see >> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/sp >> ark/rdd/PairRDDFunctions.scala >> <https://urldefense.proofpoint.com/v1/url?u=https://github.com/apache/spark/b >> lob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala&k= >> fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=UKDOcu6qL3KsoZhpOohNBR1ucPNmWnbd3eEJ9hVUdMk >> %3D%0A&m=Sb74h34ZToCtFlhH6q91HplG%2FXaCtRoAmwWFXD9vXI0%3D%0A&s=a68ed701b6f285 >> 5cc2fb0aaec8d033cd6ef9bafbb2a91ce7a10e465e79d0a4d2> at line 843, the HDFS >> write happens at per-partition processing, not at the result handling, so I >> have a feeling that it might be writing multiple times. This may be fine if >> both tasks for the same partition completes because it will simply overwrite >> the output partition with the same content, but this could be an issue if one >> of the tasks completes and the other is in the middle of writing the >> partition by the time the entire stage completes. Can someone explain this? >> >> Bertrand, I¹m slightly confused about your comment. So, is it the case that >> HDFS will handle the writes as a temp file write followed by an atomic move, >> so the concern I had above is handled at the HDFS level? >> >> Mingyu >> >> From: Bertrand Dechoux <decho...@gmail.com> >> Reply-To: "user@spark.apache.org" <user@spark.apache.org> >> Date: Tuesday, July 15, 2014 at 1:22 PM >> To: "user@spark.apache.org" <user@spark.apache.org> >> Subject: Re: How does Spark speculation prevent duplicated work? >> >> I haven't look at the implementation but what you would do with any >> filesystem is write to a file inside the workspace directory of the task. And >> then only the attempt of the task that should be kept will perform a move to >> the final path. The other attempts are simply discarded. For most filesystem >> (and that's the case for HDFS), a 'move' is a very simple and fast action >> because only the "full path/name" of the file change but not its content or >> where this content is physically stored. >> >> Executive speculation happens in Hadoop MapReduce. Spark has the same >> concept. As long as you apply functions with no side effect (ie the only >> impact is the returned results), then you just need to not take into account >> results from additional attempts of the same task/operator. >> >> Bertrand Dechoux >> >> >> On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash <and...@andrewash.com> wrote: >>> Hi Nan, >>> >>> Great digging in -- that makes sense to me for when a job is producing some >>> output handled by Spark like a .count or .distinct or similar. >>> >>> For the other part of the question, I'm also interested in side effects like >>> an HDFS disk write. If one task is writing to an HDFS path and another task >>> starts up, wouldn't it also attempt to write to the same path? How is that >>> de-conflicted? >>> >>> >>> On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu <zhunanmcg...@gmail.com> wrote: >>>> Hi, Mingyuan, >>>> >>>> According to my understanding, >>>> >>>> Spark processes the result generated from each partition by passing them to >>>> resultHandler (SparkContext.scala L1056) >>>> >>>> This resultHandler is usually just put the result in a driver-side array, >>>> the length of which is always partitions.size >>>> >>>> this design effectively ensures that the actions are idempotent >>>> >>>> e.g. the count is implemented as >>>> >>>> def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum >>>> >>>> even the task in the partition is duplicately executed, the result put in >>>> the array is the same >>>> >>>> >>>> >>>> At the same time, I think the Spark implementation ensures that the >>>> operation applied on the return value of SparkContext.runJob will not be >>>> triggered when the duplicate tasks are finished >>>> >>>> Because, >>>> >>>> >>>> when a task is finished, the code execution path is >>>> TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded >>>> >>>> in taskEnded, it will trigger the CompletionEvent message handler, where >>>> DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is >>>> the partitionid >>>> >>>> so even the duplicate task invokes a CompletionEvent message, it will find >>>> job.finished(rt.outputId) has been true eventually >>>> >>>> >>>> Maybe I was wrongjust went through the code roughly, welcome to correct me >>>> >>>> Best, >>>> >>>> >>>> -- >>>> Nan Zhu >>>> >>>> On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote: >>>>> >>>>> Hi all, >>>>> >>>>> I was curious about the details of Spark speculation. So, my understanding >>>>> is that, when ³speculated² tasks are newly scheduled on other machines, >>>>> the original tasks are still running until the entire stage completes. >>>>> This seems to leave some room for duplicated work because some spark >>>>> actions are not idempotent. For example, it may be counting a partition >>>>> twice in case of RDD.count or may be writing a partition to HDFS twice in >>>>> case of RDD.save*(). How does it prevent this kind of duplicated work? >>>>> >>>>> Mingyu >>>>> >>>>> Attachments: >>>>> - smime.p7s >>>> >>> >> >
smime.p7s
Description: S/MIME cryptographic signature