[ 
https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14385973#comment-14385973
 ] 

Bikas Saha commented on TEZ-714:
--------------------------------

After the first if() check shouldn't the code simply return 
vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()?
Alternatively, we could limit the new commitOrFinish() method to just 
scheduling async commits. The code in checkForCompletion() can then check if 
commitFutures is empty or not to decide the return state.
{code}        if (vertex.commitVertexOutputs && 
!vertex.committed.getAndSet(true)) {
          return commitOrFinish(vertex);
        } 
        if (vertex.commitFutures.isEmpty()) {
          // all the commits are completed successfully
          return vertex.finished(VertexState.SUCCEEDED);
        } else {
          return VertexState.COMMITTING;
        }{code}

Vertex.checkForCompletion() can be called multiple times. What prevents 
multiple commit operations from being scheduled?

Opened TEZ-2248 to reduce the complexity of error handling in 
VertexImpl.checkForCompletion().

Why is checkForCompletion called after tasks finished and commit finished? 
Those 2 sounds like different logical steps and should use separate methods. 
Perhaps a preCommit() and postCommit() method. If there is nothing to commit 
then both can be called back to back.
{code}  // either task_complete or commit_complete will trigger this method
{code}

Not sure about this logic. Since the vertex is committing before dag commit, 
the user wants to see partial output. So the 2 courses of action are
1) Optimistic case: Ignore failed commits and allow all commits to complete 
(with success or fail). If all commits succeed then proceed to succeeded. If 
any commit fails, then proceed to failed.
2) Pessimistic case: If any commit fails then abort all other commits. All or 
none per vertex.
The code seems to have chosen 1 - keep committed outputs. However the abort 
method is called from multiple places like DAG where the same choice may not be 
true. E.g. TaskCompletedAfterVertexSuccessTransition is clearing all successful 
commits and choosing the pessimistic case. 
TaskCompletedAfterVertexSuccessTransition is aborting all pending commits. So 
the logic does not seem to be consistent everywhere. We need to make this 
consistent. Either choose the pessimistic case or choose the optimistic case.
For the pessimistic case, all failures should go into the Terminating state 
(which is happening in this patch). Errors should cancel pending commits. 
However all async commit operations need to return back so that the terminating 
state can count them and then call abort on all committers when all async 
commit operations have completed. (We need to wait because we do not want to 
run a commit and abort operation concurrently.
For the optimistic case, all failures should go into the Terminating state. 
Errors should not cancel commits. When all commit operations have completed 
(success or fail) then Terminating state should go into failed/killed state.
I would opt for the optimistic case, because the user wants to see partial 
output and because the pessimistic case cannot be truly achieved. The first 
committed output could be seen by external entities before we abort it.
For the abort method itself, it could take a boolean parameter on whether to 
abort all or not instead of looking at the successful completions map. This 
avoids side-effect code like having to clear the successful maps before 
invoking abortVertex().
{code}
                if (succeededCommits.contains(entry.getKey()) && 
commitVertexOutputs) {
                  LOG.info("Don't abort for output="+ entry.getKey() + ", 
vertexId=" + logIdentifier
                      + ", since it has already commtted successfully");
                } else {
                  LOG.info("Invoking committer abort for output=" + 
entry.getKey() + ", vertexId="
                      + logIdentifier);
                  entry.getValue().abortOutput(finalState);
                }
{code}

Not looking at DAGImpl as the above comments/changes might have similar 
implications in DAGimpl.

> OutputCommitters should not run in the main AM dispatcher thread
> ----------------------------------------------------------------
>
>                 Key: TEZ-714
>                 URL: https://issues.apache.org/jira/browse/TEZ-714
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Siddharth Seth
>            Assignee: Jeff Zhang
>            Priority: Critical
>         Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, 
> TEZ-714-3.patch, TEZ-714-4.patch, TEZ-714-5.patch, Vertex_2.pdf
>
>
> Follow up jira from TEZ-41.
> 1) If there's multiple OutputCommitters on a Vertex, they can be run in 
> parallel.
> 2) Running an OutputCommitter in the main thread blocks all other event 
> handling, w.r.t the DAG, and causes the event queue to back up.
> 3) This should also cover shared commits that happen in the DAG.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to