Okay this looks good. Since I'm doing a move on commit() then the close() method in the output would be a no-op (this seems fine).
Thanks! Thad On Wed, Jul 23, 2014 at 5:37 PM, Bikas Saha <[email protected]> wrote: > The processor is expected to get the commit permission from the AM (be > told it’s the valid task) and then call output.commit(). > > > > The framework calls output.close() upon task completion irrespective of > whether the task succeeded/failed/killed/could-not-commit. close() is for > cleanup and not for commit. > > > > Bikas > > > > *From:* Thaddeus Diamond [mailto:[email protected]] > *Sent:* Wednesday, July 23, 2014 1:25 PM > *To:* [email protected] > *Cc:* Gopal V > > *Subject:* Re: Writing to HDFS from an Output > > > > Also, can the LogicalOutput figure out if it is still valid? I want to > commit() as the very last thing in LogicalOutput#close(). > > > > On Wed, Jul 23, 2014 at 12:56 PM, Thaddeus Diamond < > [email protected]> wrote: > > Oh wait, OutputCommitter is called from the AM looking at the javadoc. > Does it have a list of successful attempt id's or something that could > allow it to iterate through batches and move files? > > > > On Tue, Jul 22, 2014 at 4:13 PM, Thaddeus Diamond < > [email protected]> wrote: > > Okay I'll look at that. Continuing down the thought of an > OutputCommitter, what happens if my code looks as follows: > > > > Processor > > -------------- > > if (context.canCommit()) { > > outputCommitter.commit(); > > } > > > > OutputCommitter > > -------------- > > void commit() { > > hdfsClient.move(temporaryFile, finalDestination); > > } > > > > What happens if the JVM crashes between the if statement and the commit() > function call. Will any other tasks be able to commit? Will my job just > fail or will there be incomplete results? > > > > On Tue, Jul 22, 2014 at 1:21 PM, Bikas Saha <[email protected]> wrote: > > That’s correct. That’s why you should probably use MROutput to write to > HDFS if you data is already Key-Value. Don’t think of MROutput as MR > compatibility. Its more like a Key-Value data writer using the OutputFormat > abstraction. We should perhaps change the name of MROutput to remove the MR > connotation. > > > > Of course, if your data is not KeyValue or you have some other special > needs, then yes you need to write your own output. > > > > Bikas > > > > *From:* Thaddeus Diamond [mailto:[email protected]] > *Sent:* Tuesday, July 22, 2014 6:53 AM > *To:* Gopal V > *Cc:* [email protected] > > > *Subject:* Re: Writing to HDFS from an Output > > > > Sorry, see the class now. My LogicalOutput would also have to implement > OutputCommitter or presumably have a helper class that does. > > > > On Tue, Jul 22, 2014 at 9:49 AM, Thaddeus Diamond < > [email protected]> wrote: > > I see. Now when you say "both output operations write to unique > directories", this appears to be something I would have to enforce if I > wrote my own LogicalOutput class, correct? Presumably I could add a > commit() method to my LogicalOutput implementation that would actually > perform the atomic HDFS move when the processor calls it from its close() > method. Is that what you would suggest? > > > > On Tue, Jul 22, 2014 at 2:38 AM, Gopal V <[email protected]> wrote: > > On 7/22/14, 8:19 AM, Thaddeus Diamond wrote: > > Also is OnFileSortedOutput allowed to be terminal? And is it immune to the > race conditions described? It seems like flush() is actually writing to a > file stream on HDFS, without using canCommit(), which would have the > aforementioned problems. > > > > OnFileSortedOutput is not terminal, it is a local file output mode which > writes to the local filesystem, not HDFS. That serves as the connecting > edge between two vertices and it is not even total order sorted. > > The scatter-gather edge does total order, if you use a > TotalOrderPartitioner (or in MR-like terms "a reducer"). > > You need something like MROutput, connected as a terminal operator after > that total order scatter-gather (i.e send all partition0 to reducer0, have > it write 00000_0). > > But for the sake of clarity, here is a vague simplification of how we > avoid race conditions in output phase. > > There is a transactional system, which is similar to lock elision. > > There is no difference between how that works between task failure > scenarios (split network, for instance) or speculation. It will be used if > there are more than one attempt for a given task. > > The interface to that is called Task::canCommit(). It will return "true" > for the very first task which gets there and false for every attempt after > that (well, that's a simplification, but it sort of will do for now). > > To prevent clobbering each other's output files when 2 attempts are in > flight (due to speculation or partial writes due to failure), both output > operations write to unique directories. > > The unique directory name is something like > <dest-dir>/_temporary/<task>/<attempt>/ > > To commit an attempt as the final output, files from its temp dir will be > moved to the final output location. > > For people more familiar with native locking, this is an elision with a > visibility criteria instead of being truly mutual exclusion (i.e nothing > stops two attempts from running in parallel, the first one to finish wins, > the loser's side-effects are removed). > > This is how the system works fast without race conditions. > > Cheers, > Gopal > > > > > > On Mon, Jul 21, 2014 at 10:15 PM, Thaddeus Diamond < > [email protected]> wrote: > > Okay cool that makes sense. I will look into the OutputCommitter and > OutputFormat plus upgrade to 0.5. > > Next question: In MR the OutputCollector provides a native way to do > global sorting just by specifying the output key. Is there a way with the > OutputCommitter or OutputFormat to do that? Basically I just want a way to > do key-value output and have it be HDFS-sorted on disk. > > Thanks, > Thad > > > On Mon, Jul 21, 2014 at 2:02 PM, Hitesh Shah <[email protected]> wrote: > > Hi Thad, > > With respect to speculation related race conditions, the condition can > hold true even if speculation is disabled. A task attempt may lose > connection to the AM and a new attempt launched ( even though the earlier > attempt continues to run). In either scenario, an Output has access to a > canCommit() API that should be invoked from the task runtime. This does a > check with the AM whether it is the valid attempt before it “commits” its > data. For an HDFS based output, it would usually write all its data into a > “/taskId/attemptId/” sub-dir and move its data into the parent “taskId” dir > if the canCommit call returns true. > > thanks > — Hitesh > > On Jul 21, 2014, at 10:51 AM, Bikas Saha <[email protected]> wrote: > > > If it’s the race condition that you are worried about then you should > look at o.a.t.r.a. OutputCommitter.java > > > > Whenever an output is specified, one can also specify a committer. The > committer is executed after all the outputs have been written (at the end > of that vertex completion or at the end of dag completion). Its job is to > “commit” the output in a way that makes sense for that output. E.g. for > HDFS or any Hadoop Filesystem output, users typically specify a > FileOutputCommitter. It works in conjunction with the FileOutputFormat. > FileOutputFormat writes the individual task outputs to sub-dirs of the > actual output dir, thus avoiding collisions between speculative executions > or re-executions. In the final output commit phase, the valid output files > are moved to the actual output dir and the invalid ones are deleted. > > > > They are available in 0.5. If you are still in the POC phase of your > project, its recommended to work with the 0.5 (unreleased) as the APIs have > been heavily cleaned up and simplified. Look at > OrderedPartitionedKVEdgeConfigurer or UnorderedPartitionedKVEdgeConfigurer. > > > > From: Thaddeus Diamond [mailto:[email protected]] > > Sent: Sunday, July 20, 2014 9:02 PM > > To: [email protected] > > Cc: [email protected] > > Subject: Re: Writing to HDFS from an Output > > > > Okay so this means potentially this COULD be a race condition (though > presumably if you disable the speculative execution via conf it would go > away). Would switching over to the new OutputFormat API solve this issue > even if I don't use OutputCollector? I do want to be able to leverage SE > when it's implemented. > > > > Quick aside: Is PartitionedKeyValue in 0.4.1? I am on branch > 0.4.1-incubating (79997ff). Couldn't find it. > > > > Thanks, > > Thad > > > > On Sun, Jul 20, 2014 at 8:08 PM, Bikas Saha <[email protected]> > wrote: > > The approach is correct from a purist point of view. > > > > Since Tez is data-type agnostic, there is no higher level entity for > handling logical data in Tez directly. However, input/output > implementations may provide them where it makes sense. Eg. The > PartitionedKeyValue outputs allow the specification of a partitioner that > can partition key value data by the key. > > > > The MRHelper methods are mainly to help with MR compatibility, though > some of them are generic KeyValue helper methods that may be moved into a > Tez (non-MR) helper utility. So depending on the method you are using, you > may still be native Tez. > > > > IMO, MRInput may fall in the same vein. When dealing with KeyValue data > types from disparate sources, the InputFormat and OutputFormat layers form > a useful abstraction to do that translation. That’s why we decided to > include support for them instead of re-defining that translation layer. > This way we can leverage all the existing implementations of getting KV > data from HDFS/S3/LocalFiles/ZippedFiles/Text/RC etc. > > > > Speculation support is not there but tracked as a work item for a near > term release, may be 0.6. > > > > Bikas > > > > From: Thaddeus Diamond [mailto:[email protected]] > > Sent: Sunday, July 20, 2014 4:27 PM > > To: [email protected] > > Subject: Writing to HDFS from an Output > > > > Hi, > > > > I'm trying to create a simple I/P/O to do the following: > > > > Input -> Generate data from Java objects (for now, just random strings) > > Processor -> Bucket those strings into output groups > > Output -> Write each output group bucket to HDFS in a different file, > under the same subdirectory > > > > I have Input and Processor classes are uninteresting in this example. > The Output I've created (MyLogicalOutput) implements LogicalOutput and > creates a new file directly in HDFS using the Java API. This returns an > FSDataOutputStream, which it then writes to. > > > > My question is this: is this the correct paradigm? I wondered if there > were any native Tez abstractions like the OutputCollector in MR. > > > > Also, does Tez have speculative execution that could cause race > conditions? > > > > I don't want to use MRInput or any of the MRHelpers methods to > translate an existing MR job, I want this to be native Tez. > > > > Thanks! > > Thad > > > > CONFIDENTIALITY NOTICE > > NOTICE: This message is intended for the use of the individual or > entity to which it is addressed and may contain information that is > confidential, privileged and exempt from disclosure under applicable law. > If the reader of this message is not the intended recipient, you are hereby > notified that any printing, copying, dissemination, distribution, > disclosure or forwarding of this communication is strictly prohibited. If > you have received this communication in error, please contact the sender > immediately and delete it from your system. Thank You. > > > > > > CONFIDENTIALITY NOTICE > > NOTICE: This message is intended for the use of the individual or > entity to which it is addressed and may contain information that is > confidential, privileged and exempt from disclosure under applicable law. > If the reader of this message is not the intended recipient, you are hereby > notified that any printing, copying, dissemination, distribution, > disclosure or forwarding of this communication is strictly prohibited. If > you have received this communication in error, please contact the sender > immediately and delete it from your system. Thank You. > > > > > > > > > > > > > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity > to which it is addressed and may contain information that is confidential, > privileged and exempt from disclosure under applicable law. If the reader > of this message is not the intended recipient, you are hereby notified that > any printing, copying, dissemination, distribution, disclosure or > forwarding of this communication is strictly prohibited. If you have > received this communication in error, please contact the sender immediately > and delete it from your system. Thank You. > > > > > > > > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity > to which it is addressed and may contain information that is confidential, > privileged and exempt from disclosure under applicable law. If the reader > of this message is not the intended recipient, you are hereby notified that > any printing, copying, dissemination, distribution, disclosure or > forwarding of this communication is strictly prohibited. If you have > received this communication in error, please contact the sender immediately > and delete it from your system. Thank You. >
