That makes sense. Thanks everyone for the explanations!

Mingyu

From:  Matei Zaharia <matei.zaha...@gmail.com>
Reply-To:  "user@spark.apache.org" <user@spark.apache.org>
Date:  Tuesday, July 15, 2014 at 3:00 PM
To:  "user@spark.apache.org" <user@spark.apache.org>
Subject:  Re: How does Spark speculation prevent duplicated work?

Yeah, this is handled by the "commit" call of the FileOutputFormat. In
general Hadoop OutputFormats have a concept called "committing" the output,
which you should do only once per partition. In the file ones it does an
atomic rename to make sure that the final output is a complete file.

Matei

On Jul 15, 2014, at 2:49 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> The way the HDFS file writing works at a high level is that each attempt to
> write a partition to a file starts writing to unique temporary file (say,
> something like targetDirectory/_temp/part-XXXXX_attempt-YYYY). If the writing
> into the file successfully completes, then the temporary file is moved to the
> final location (say, targetDirectory/part-XXXXX). If, due to speculative
> execution, the file already exists in the final intended location, then move
> is avoided. Or, its overwritten, I forget the implementation. Either ways, all
> attempts to write the same partition, will always write the same data to the
> temp file (assuming the spark transformation generating the data is
> deterministic and idempotent). And once one attempt is successful, the final
> file will have the same data. Hence, writing to HDFS / S3 is idempotent.
> 
> Now this logic is already implemented within the Hadoop's MapReduce logic, and
> Spark just uses it directly.
> 
> TD
> 
> 
> On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim <m...@palantir.com> wrote:
>> Thanks for the explanation, guys.
>> 
>> I looked into the saveAsHadoopFile implementation a little bit. If you see
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/sp
>> ark/rdd/PairRDDFunctions.scala
>> <https://urldefense.proofpoint.com/v1/url?u=https://github.com/apache/spark/b
>> lob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala&k=
>> fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=UKDOcu6qL3KsoZhpOohNBR1ucPNmWnbd3eEJ9hVUdMk
>> %3D%0A&m=Sb74h34ZToCtFlhH6q91HplG%2FXaCtRoAmwWFXD9vXI0%3D%0A&s=a68ed701b6f285
>> 5cc2fb0aaec8d033cd6ef9bafbb2a91ce7a10e465e79d0a4d2>  at line 843, the HDFS
>> write happens at per-partition processing, not at the result handling, so I
>> have a feeling that it might be writing multiple times. This may be fine if
>> both tasks for the same partition completes because it will simply overwrite
>> the output partition with the same content, but this could be an issue if one
>> of the tasks completes and the other is in the middle of writing the
>> partition by the time the entire stage completes. Can someone explain this?
>> 
>> Bertrand, I¹m slightly confused about your comment. So, is it the case that
>> HDFS will handle the writes as a temp file write followed by an atomic move,
>> so the concern I had above is handled at the HDFS level?
>> 
>> Mingyu
>> 
>> From: Bertrand Dechoux <decho...@gmail.com>
>> Reply-To: "user@spark.apache.org" <user@spark.apache.org>
>> Date: Tuesday, July 15, 2014 at 1:22 PM
>> To: "user@spark.apache.org" <user@spark.apache.org>
>> Subject: Re: How does Spark speculation prevent duplicated work?
>> 
>> I haven't look at the implementation but what you would do with any
>> filesystem is write to a file inside the workspace directory of the task. And
>> then only the attempt of the task that should be kept will perform a move to
>> the final path. The other attempts are simply discarded. For most filesystem
>> (and that's the case for HDFS), a 'move' is a very simple and fast action
>> because only the "full path/name" of the file change but not its content or
>> where this content is physically stored.
>> 
>> Executive speculation happens in Hadoop MapReduce. Spark has the same
>> concept. As long as you apply functions with no side effect (ie the only
>> impact is the returned results), then you just need to not take into account
>> results from additional attempts of the same task/operator.
>> 
>> Bertrand Dechoux
>> 
>> 
>> On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash <and...@andrewash.com> wrote:
>>> Hi Nan, 
>>> 
>>> Great digging in -- that makes sense to me for when a job is producing some
>>> output handled by Spark like a .count or .distinct or similar.
>>> 
>>> For the other part of the question, I'm also interested in side effects like
>>> an HDFS disk write.  If one task is writing to an HDFS path and another task
>>> starts up, wouldn't it also attempt to write to the same path?  How is that
>>> de-conflicted?
>>> 
>>> 
>>> On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu <zhunanmcg...@gmail.com> wrote:
>>>> Hi, Mingyuan, 
>>>> 
>>>> According to my understanding,
>>>> 
>>>> Spark processes the result generated from each partition by passing them to
>>>> resultHandler (SparkContext.scala L1056)
>>>> 
>>>> This resultHandler is usually just put the result in a driver-side array,
>>>> the length of which is always partitions.size
>>>> 
>>>> this design effectively ensures that the actions are idempotent
>>>> 
>>>> e.g. the count is implemented as
>>>> 
>>>> def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>>>> 
>>>> even the task in the partition is duplicately executed, the result put in
>>>> the array is the same
>>>> 
>>>> 
>>>> 
>>>> At the same time, I think the Spark implementation ensures that the
>>>> operation applied on the return value of SparkContext.runJob will not be
>>>> triggered when the duplicate tasks are finished
>>>> 
>>>> Because, 
>>>> 
>>>> 
>>>> when a task is finished, the code execution path is
>>>> TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded
>>>> 
>>>> in taskEnded, it will trigger the CompletionEvent message handler, where
>>>> DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is
>>>> the partitionid
>>>> 
>>>> so even the duplicate task invokes a CompletionEvent message, it will find
>>>> job.finished(rt.outputId) has been true eventually
>>>> 
>>>> 
>>>> Maybe I was wrongŠjust went through the code roughly, welcome to correct me
>>>> 
>>>> Best,
>>>> 
>>>> 
>>>> -- 
>>>> Nan Zhu
>>>> 
>>>> On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:
>>>>> 
>>>>> Hi all,
>>>>> 
>>>>> I was curious about the details of Spark speculation. So, my understanding
>>>>> is that, when ³speculated² tasks are newly scheduled on other machines,
>>>>> the original tasks are still running until the entire stage completes.
>>>>> This seems to leave some room for duplicated work because some spark
>>>>> actions are not idempotent. For example, it may be counting a partition
>>>>> twice in case of RDD.count or may be writing a partition to HDFS twice in
>>>>> case of RDD.save*(). How does it prevent this kind of duplicated work?
>>>>> 
>>>>> Mingyu
>>>>> 
>>>>> Attachments:
>>>>> - smime.p7s
>>>> 
>>> 
>> 
> 



Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to