[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14385973#comment-14385973 ]
Bikas Saha commented on TEZ-714: -------------------------------- After the first if() check shouldn't the code simply return vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()? Alternatively, we could limit the new commitOrFinish() method to just scheduling async commits. The code in checkForCompletion() can then check if commitFutures is empty or not to decide the return state. {code} if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) { return commitOrFinish(vertex); } if (vertex.commitFutures.isEmpty()) { // all the commits are completed successfully return vertex.finished(VertexState.SUCCEEDED); } else { return VertexState.COMMITTING; }{code} Vertex.checkForCompletion() can be called multiple times. What prevents multiple commit operations from being scheduled? Opened TEZ-2248 to reduce the complexity of error handling in VertexImpl.checkForCompletion(). Why is checkForCompletion called after tasks finished and commit finished? Those 2 sounds like different logical steps and should use separate methods. Perhaps a preCommit() and postCommit() method. If there is nothing to commit then both can be called back to back. {code} // either task_complete or commit_complete will trigger this method {code} Not sure about this logic. Since the vertex is committing before dag commit, the user wants to see partial output. So the 2 courses of action are 1) Optimistic case: Ignore failed commits and allow all commits to complete (with success or fail). If all commits succeed then proceed to succeeded. If any commit fails, then proceed to failed. 2) Pessimistic case: If any commit fails then abort all other commits. All or none per vertex. The code seems to have chosen 1 - keep committed outputs. However the abort method is called from multiple places like DAG where the same choice may not be true. E.g. TaskCompletedAfterVertexSuccessTransition is clearing all successful commits and choosing the pessimistic case. TaskCompletedAfterVertexSuccessTransition is aborting all pending commits. So the logic does not seem to be consistent everywhere. We need to make this consistent. Either choose the pessimistic case or choose the optimistic case. For the pessimistic case, all failures should go into the Terminating state (which is happening in this patch). Errors should cancel pending commits. However all async commit operations need to return back so that the terminating state can count them and then call abort on all committers when all async commit operations have completed. (We need to wait because we do not want to run a commit and abort operation concurrently. For the optimistic case, all failures should go into the Terminating state. Errors should not cancel commits. When all commit operations have completed (success or fail) then Terminating state should go into failed/killed state. I would opt for the optimistic case, because the user wants to see partial output and because the pessimistic case cannot be truly achieved. The first committed output could be seen by external entities before we abort it. For the abort method itself, it could take a boolean parameter on whether to abort all or not instead of looking at the successful completions map. This avoids side-effect code like having to clear the successful maps before invoking abortVertex(). {code} if (succeededCommits.contains(entry.getKey()) && commitVertexOutputs) { LOG.info("Don't abort for output="+ entry.getKey() + ", vertexId=" + logIdentifier + ", since it has already commtted successfully"); } else { LOG.info("Invoking committer abort for output=" + entry.getKey() + ", vertexId=" + logIdentifier); entry.getValue().abortOutput(finalState); } {code} Not looking at DAGImpl as the above comments/changes might have similar implications in DAGimpl. > OutputCommitters should not run in the main AM dispatcher thread > ---------------------------------------------------------------- > > Key: TEZ-714 > URL: https://issues.apache.org/jira/browse/TEZ-714 > Project: Apache Tez > Issue Type: Improvement > Reporter: Siddharth Seth > Assignee: Jeff Zhang > Priority: Critical > Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, > TEZ-714-3.patch, TEZ-714-4.patch, TEZ-714-5.patch, Vertex_2.pdf > > > Follow up jira from TEZ-41. > 1) If there's multiple OutputCommitters on a Vertex, they can be run in > parallel. > 2) Running an OutputCommitter in the main thread blocks all other event > handling, w.r.t the DAG, and causes the event queue to back up. > 3) This should also cover shared commits that happen in the DAG. -- This message was sent by Atlassian JIRA (v6.3.4#6332)