[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14486038#comment-14486038 ] Bikas Saha edited comment on TEZ-714 at 4/8/15 9:06 PM: abortVertex() should probably be called for all non-succeeded states? Similar to DAGImpl. Otherwise TaskCompletedAfterVertexSuccessTransition will have a regression. {code} case ERROR: addDiagnostic("Vertex: " + logIdentifier + " error due to:" + terminationCause); if (!StringUtils.isEmpty(diag)) { addDiagnostic(diag); } eventHandler.handle(new DAGEvent(getDAGId(), DAGEventType.INTERNAL_ERROR)); try { logJobHistoryVertexFailedEvent(finalState); } catch (IOException e) { LOG.error("Failed to send vertex finished event to recovery", e); } break; case KILLED: case FAILED: addDiagnostic("Vertex " + logIdentifier + " killed/failed due to:" + terminationCause); if (!StringUtils.isEmpty(diag)) { addDiagnostic(diag); } abortVertex(VertexStatus.State.valueOf(finalState.name())); {code} testCommitOutputOnVertexSuccess() still has {code}// both committers fail DAG dag4 = createDAG("testDAGBothCommitsFail", false, true); <<< true/true {code} Mis-written comment? v3 is killed due to... {code} // killed is failed due to the commit failure of the vertex group (v1,v2) Assert.assertEquals(VertexState.KILLED, v3.getState()); Assert.assertEquals(VertexTerminationCause.OTHER_VERTEX_FAILURE, {code} The test is not checking/verifying that one of the commits was indeed not scheduled on the threadpool and cancelled before that. Maybe set a flag when the callable is scheduled and check that the flag was not set. Also perhaps set a flag when interrupted exception is caught when the scheduled event got canceled. {code} // test commit will be canceled no matter it is started or still in the threadpool @Test(timeout = 5000) public void testCommitCanceled() throws Exception { {code} Please cleanup the findbugs warnings. Spoke offline with [~hitesh] about aborting all output after any failure and it seems like the right thing to do. was (Author: bikassaha): abortVertex() should probably be called for all non-succeeded states? Other TaskCompletedAfterVertexSuccessTransition will have a regression. {code} case ERROR: addDiagnostic("Vertex: " + logIdentifier + " error due to:" + terminationCause); if (!StringUtils.isEmpty(diag)) { addDiagnostic(diag); } eventHandler.handle(new DAGEvent(getDAGId(), DAGEventType.INTERNAL_ERROR)); try { logJobHistoryVertexFailedEvent(finalState); } catch (IOException e) { LOG.error("Failed to send vertex finished event to recovery", e); } break; case KILLED: case FAILED: addDiagnostic("Vertex " + logIdentifier + " killed/failed due to:" + terminationCause); if (!StringUtils.isEmpty(diag)) { addDiagnostic(diag); } abortVertex(VertexStatus.State.valueOf(finalState.name())); {code} testCommitOutputOnVertexSuccess() still has {code}// both committers fail DAG dag4 = createDAG("testDAGBothCommitsFail", false, true); <<< true/true {code} Mis-written comment? v3 is killed due to... {code} // killed is failed due to the commit failure of the vertex group (v1,v2) Assert.assertEquals(VertexState.KILLED, v3.getState()); Assert.assertEquals(VertexTerminationCause.OTHER_VERTEX_FAILURE, {code} The test is not checking/verifying that one of the commits was indeed not scheduled on the threadpool and cancelled before that. Maybe set a flag when the callable is scheduled and check that the flag was not set. Also perhaps set a flag when interrupted exception is caught when the scheduled event got canceled. {code} // test commit will be canceled no matter it is started or still in the threadpool @Test(timeout = 5000) public void testCommitCanceled() throws Exception { {code} Please cleanup the findbugs warnings. Spoke offline with [~hitesh] about aborting all output after any failure and it seems like the right thing to do. > OutputCommitters should not run in the main AM dispatcher thread > > > Key: TEZ-714 > URL: https://issues.apache.org/jira/browse/TEZ-714 > Project: Apache Tez > Issue Type: Improvement >Reporter: Siddharth Seth >Assignee: Jeff Zhang >Priority: Critical > Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-10.patch, > TEZ-714-2.patch, TEZ-714-3.patch, TEZ-714-4.patch, TEZ-714-5.patch, > TEZ-714-6.patch, TEZ-714-7.patch, TEZ-714-8.patch, TEZ-714-9.patch, > Vertex
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386086#comment-14386086 ] Jeff Zhang edited comment on TEZ-714 at 3/30/15 7:03 AM: - bq. This is a known issue. There is a comment somewhere in the code but I dont think its tracked by a jira. Create TEZ-2249 to track it. bq. After the first if() check shouldn't the code simply return vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()? The first if() means that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the commit has not been started. In this case call commitOrFinished which would call async commit if there's any committer for this vertex or just finish if no committer. The second if() means either TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true or all the committers are completed successfully. May need to refactor code here to make it more clear bq. Why is checkForCompletion called after tasks finished and commit finished? Those 2 sounds like different logical steps and should use separate methods. Perhaps a preCommit() and postCommit() method. If there is nothing to commit then both can be called back to back. Both task finishing and commit finishing may be the last step to the finished state (if no commit then task finishing is the last step, and committing finish is the last step if there's commit), so they would both trigger checkForCompletion. bq. Optimistic case: Ignore failed commits and allow all commits to complete (with success or fail). If all commits succeed then proceed to succeeded. If any commit fails, then proceed to failed. What i am doing is not exactly this way. I won't ignore failed commits, stead I will cancel pending commits and wait for them to complete and then move to failed state. This behavior is consistent with other cases when there's any fail event happens (commit fail, vertex termination event and etc ), all the cases would cancel pending commits and wait for them to complete and then move to finished state. bq. For the optimistic case, all failures should go into the Terminating state. Errors should not cancel commits. Why Errors should not cancel commits ? Just because Errors don't have effort on the data to be committed ? bq. bq. E.g. TaskCompletedAfterVertexSuccessTransition is clearing all successful commits and choosing the pessimistic case. TaskCompletedAfterVertexSuccessTransition is aborting all pending commits. So the logic does not seem to be consistent everywhere. We need to make this consistent. Either choose the pessimistic case or choose the optimistic case. bq. For the abort method itself, it could take a boolean parameter on whether to abort all or not instead of looking at the successful completions map. This avoids side-effect code like having to clear the successful maps before invoking abortVertex(). After second thought, I think it may be not necessary to check whether the commits are successful. The semantics of abort should mean clean up any intermediate data during committing, should not have side effort on the committed data. So that means abort has no effect on the successful commits. was (Author: zjffdu): bq. This is a known issue. There is a comment somewhere in the code but I dont think its tracked by a jira. Create TEZ-2249 to track it. bq. After the first if() check shouldn't the code simply return vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()? The first if() means that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the commit has not been started. In this case call commitOrFinished which would call async commit if there's any committer for this vertex or just finish if no committer. The second if() means either TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true or all the committers are completed successfully. May need to refactor code here to make it more clear bq. Why is checkForCompletion called after tasks finished and commit finished? Those 2 sounds like different logical steps and should use separate methods. Perhaps a preCommit() and postCommit() method. If there is nothing to commit then both can be called back to back. Both task finishing and commit finishing may be the last step to the finished state (if no commit then task finishing is the last step, and committing finish is the last step if there's commit), so they would both trigger checkForCompletion. bq. Optimistic case: Ignore failed commits and allow all commits to complete (with success or fail). If all commits succeed then proceed to succeeded. If any commit fails, then proceed to failed. What i am doing is not exactly this way. I won't ignore failed commits, stead I will cancel pending commits and wait for them to complete and then move to failed state. This behavior is consistent with other cases when there's any fail event happens (comm
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386086#comment-14386086 ] Jeff Zhang edited comment on TEZ-714 at 3/30/15 3:18 AM: - bq. This is a known issue. There is a comment somewhere in the code but I dont think its tracked by a jira. Create TEZ-2249 to track it. bq. After the first if() check shouldn't the code simply return vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()? The first if() means that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the commit has not been started. In this case call commitOrFinished which would call async commit if there's any committer for this vertex or just finish if no committer. The second if() means either TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true or all the committers are completed successfully. May need to refactor code here to make it more clear bq. Why is checkForCompletion called after tasks finished and commit finished? Those 2 sounds like different logical steps and should use separate methods. Perhaps a preCommit() and postCommit() method. If there is nothing to commit then both can be called back to back. Both task finishing and commit finishing may be the last step to the finished state (if no commit then task finishing is the last step, and committing finish is the last step if there's commit), so they would both trigger checkForCompletion. bq. Optimistic case: Ignore failed commits and allow all commits to complete (with success or fail). If all commits succeed then proceed to succeeded. If any commit fails, then proceed to failed. What i am doing is not exactly this way. I won't ignore failed commits, stead I will cancel pending commits and wait for them to complete and then move to failed state. This behavior is consistent with other cases when there's any fail event happens (commit fail, vertex termination event and etc ), all the cases would cancel pending commits and wait for them to complete and then move to finished state. bq. For the optimistic case, all failures should go into the Terminating state. Errors should not cancel commits. Why Errors should not cancel commits ? Just because Errors don't have effort on the data to be committed ? bq. bq. E.g. TaskCompletedAfterVertexSuccessTransition is clearing all successful commits and choosing the pessimistic case. TaskCompletedAfterVertexSuccessTransition is aborting all pending commits. So the logic does not seem to be consistent everywhere. We need to make this consistent. Either choose the pessimistic case or choose the optimistic case. bq. For the abort method itself, it could take a boolean parameter on whether to abort all or not instead of looking at the successful completions map. This avoids side-effect code like having to clear the successful maps before invoking abortVertex(). After second thought, I think it may be not necessary to check whether the commits are successful. The semantics of abort should mean clean up any intermediate data during committing, should not have side effort on the committed data. was (Author: zjffdu): bq. This is a known issue. There is a comment somewhere in the code but I dont think its tracked by a jira. Create TEZ-2249 to track it. bq. After the first if() check shouldn't the code simply return vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()? The first if() means that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the commit has not been started. In this case call commitOrFinished which would call async commit if there's any committer for this vertex or just finish if no committer. The second if() means either TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true or all the committers are completed successfully. May need to refactor code here to make it more clear bq. Why is checkForCompletion called after tasks finished and commit finished? Those 2 sounds like different logical steps and should use separate methods. Perhaps a preCommit() and postCommit() method. If there is nothing to commit then both can be called back to back. Both task finishing and commit finishing may be the last step to the finished state (if no commit then task finishing is the last step, and committing finish is the last step if there's commit), so they would both trigger checkForCompletion. bq. Optimistic case: Ignore failed commits and allow all commits to complete (with success or fail). If all commits succeed then proceed to succeeded. If any commit fails, then proceed to failed. What i am doing is not exactly this way. I won't ignore failed commits, stead I will cancel pending commits and wait for them to complete and then move to failed state. This behavior is consistent with other cases when there's any fail event happens (commit fail, vertex termination event and etc ), all the cases
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386086#comment-14386086 ] Jeff Zhang edited comment on TEZ-714 at 3/30/15 3:14 AM: - bq. This is a known issue. There is a comment somewhere in the code but I dont think its tracked by a jira. Create TEZ-2249 to track it. bq. After the first if() check shouldn't the code simply return vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()? The first if() means that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the commit has not been started. In this case call commitOrFinished which would call async commit if there's any committer for this vertex or just finish if no committer. The second if() means either TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true or all the committers are completed successfully. May need to refactor code here to make it more clear bq. Why is checkForCompletion called after tasks finished and commit finished? Those 2 sounds like different logical steps and should use separate methods. Perhaps a preCommit() and postCommit() method. If there is nothing to commit then both can be called back to back. Both task finishing and commit finishing may be the last step to the finished state (if no commit then task finishing is the last step, and committing finish is the last step if there's commit), so they would both trigger checkForCompletion. bq. Optimistic case: Ignore failed commits and allow all commits to complete (with success or fail). If all commits succeed then proceed to succeeded. If any commit fails, then proceed to failed. What i am doing is not exactly this way. I won't ignore failed commits, stead I will cancel pending commits and wait for them to complete and then move to failed state. This behavior is consistent with other cases when there's any fail event happens (commit fail, vertex termination event and etc ), all the cases would cancel pending commits and wait for them to complete and then move to finished state. bq. For the optimistic case, all failures should go into the Terminating state. Errors should not cancel commits. Why Errors should not cancel commits ? Just because Errors don't have effort on the data to be committed ? was (Author: zjffdu): bq. This is a known issue. There is a comment somewhere in the code but I dont think its tracked by a jira. Create TEZ-2249 to track it. bq. After the first if() check shouldn't the code simply return vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()? The first if() means that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the commit has not been started. In this case call commitOrFinished which would call async commit if there's any committer for this vertex or just finish if no committer. The second if() means either TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true or all the committers are completed successfully. May need to refactor code here to make it more clear bq. Why is checkForCompletion called after tasks finished and commit finished? Those 2 sounds like different logical steps and should use separate methods. Perhaps a preCommit() and postCommit() method. If there is nothing to commit then both can be called back to back. Both task finishing and commit finishing may be the last step to the finished state (if no commit then task finishing is the last step, and committing finish is the last step if there's commit), so they would both trigger checkForCompletion. bq. Optimistic case: Ignore failed commits and allow all commits to complete (with success or fail). If all commits succeed then proceed to succeeded. If any commit fails, then proceed to failed. What i am doing is not exactly this way. I won't ignore failed commits, stead I will cancel pending commits and wait for them to complete and then move to failed state. This behavior is consistent with other cases when there's any fail event happens (commit fail, vertex termination event and etc ), all the cases would cancel pending commits and wait for them to complete and then move to finished state. bq. For the optimistic case, all failures should go into the Terminating state. Errors should not cancel commits. Why Errors should not cancel commits ? Just because Errors don't have effort on the data to be committed ? bq. E.g. TaskCompletedAfterVertexSuccessTransition is clearing all successful commits and choosing the pessimistic case. TaskCompletedAfterVertexSuccessTransition is aborting all pending commits. So the logic does not seem to be consistent everywhere. We need to make this consistent. Either choose the pessimistic case or choose the optimistic case. bq. For the abort method itself, it could take a boolean parameter on whether to abort all or not instead of looking at the successful completions map. This avoids side-effect
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386113#comment-14386113 ] Jeff Zhang edited comment on TEZ-714 at 3/30/15 2:47 AM: - Regarding the partial output, After second thought, I think we should only consider it as vertex basis rather than output basis. That means either one vertex's all outputs commits successfully or abort all. I think the purpose of TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is to allow external system to check the intermediate vertex's output at vertex level. Say if one vertex has 2 outputs and TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false, and the first commit succeeded but the second commit fails, then we should abort both of them and mark this vertex to failed state. And it would be weird that if one vertex go to FAILED with one commit aborted while another commit is not aborted was (Author: zjffdu): Regarding the partial output, After second thought, I think we should only consider it as vertex basis rather than output basis. That means either one vertex's all outputs commits successfully or abort all. I think the purpose of TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is to allow external system to check the intermediate vertex's output at vertex level. Say if one vertex has 2 outputs and TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false, and the first commit succeeded but the second commit fails, then we should abort both of them and mark this vertex to failed state. > OutputCommitters should not run in the main AM dispatcher thread > > > Key: TEZ-714 > URL: https://issues.apache.org/jira/browse/TEZ-714 > Project: Apache Tez > Issue Type: Improvement >Reporter: Siddharth Seth >Assignee: Jeff Zhang >Priority: Critical > Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, > TEZ-714-3.patch, TEZ-714-4.patch, TEZ-714-5.patch, Vertex_2.pdf > > > Follow up jira from TEZ-41. > 1) If there's multiple OutputCommitters on a Vertex, they can be run in > parallel. > 2) Running an OutputCommitter in the main thread blocks all other event > handling, w.r.t the DAG, and causes the event queue to back up. > 3) This should also cover shared commits that happen in the DAG. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386113#comment-14386113 ] Jeff Zhang edited comment on TEZ-714 at 3/30/15 2:46 AM: - Regarding the partial output, After second thought, I think we should only consider it as vertex basis rather than output basis. That means either one vertex's all outputs commits successfully or abort all. I think the purpose of TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is to allow external system to check the intermediate vertex's output at vertex level. Say if one vertex has 2 outputs and TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false, and the first commit succeeded but the second commit fails, then we should abort both of them and mark this vertex to failed state. was (Author: zjffdu): Regarding the partial output, After second thought, I think we should only consider it as vertex basis rather than output basis. That means either one vertex's all outputs commits successfully or abort all. I think the purpose of TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS to allow external system is to check the intermediate vertex's output at vertex level. Say if one vertex has 2 outputs and TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false, and the first commit succeeded but the second commit fails, then we should abort both of them and mark this vertex to failed state. > OutputCommitters should not run in the main AM dispatcher thread > > > Key: TEZ-714 > URL: https://issues.apache.org/jira/browse/TEZ-714 > Project: Apache Tez > Issue Type: Improvement >Reporter: Siddharth Seth >Assignee: Jeff Zhang >Priority: Critical > Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, > TEZ-714-3.patch, TEZ-714-4.patch, TEZ-714-5.patch, Vertex_2.pdf > > > Follow up jira from TEZ-41. > 1) If there's multiple OutputCommitters on a Vertex, they can be run in > parallel. > 2) Running an OutputCommitter in the main thread blocks all other event > handling, w.r.t the DAG, and causes the event queue to back up. > 3) This should also cover shared commits that happen in the DAG. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386113#comment-14386113 ] Jeff Zhang edited comment on TEZ-714 at 3/30/15 2:45 AM: - Regarding the partial output, After second thought, I think we should only consider it as vertex basis rather than output basis. That means either one vertex's all outputs commits successfully or abort all. I think the purpose of TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS to allow external system is to check the intermediate vertex's output at vertex level. Say if one vertex has 2 outputs and TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false, and the first commit succeeded but the second commit fails, then we should abort both of them and mark this vertex to failed state. was (Author: zjffdu): Regarding the partial output, After second thought, I think we should only consider it as vertex basis rather than output basis. That means either one vertex's all outputs commits successfully or abort all. I think the purpose of TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS to allow external system to check the intermediate vertex's output. Say if one vertex has 2 outputs and TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false, and the first commit succeeded but the second commit fails, then we should abort both of them and mark this vertex to failed state. > OutputCommitters should not run in the main AM dispatcher thread > > > Key: TEZ-714 > URL: https://issues.apache.org/jira/browse/TEZ-714 > Project: Apache Tez > Issue Type: Improvement >Reporter: Siddharth Seth >Assignee: Jeff Zhang >Priority: Critical > Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, > TEZ-714-3.patch, TEZ-714-4.patch, TEZ-714-5.patch, Vertex_2.pdf > > > Follow up jira from TEZ-41. > 1) If there's multiple OutputCommitters on a Vertex, they can be run in > parallel. > 2) Running an OutputCommitter in the main thread blocks all other event > handling, w.r.t the DAG, and causes the event queue to back up. > 3) This should also cover shared commits that happen in the DAG. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386113#comment-14386113 ] Jeff Zhang edited comment on TEZ-714 at 3/30/15 2:44 AM: - Regarding the partial output, After second thought, I think we should only consider it as vertex basis rather than output basis. That means either one vertex's all outputs commits successfully or abort all. I think the purpose of TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS to allow external system to check the intermediate vertex's output. Say if one vertex has 2 outputs and TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false, and the first commit succeeded but the second commit fails, then we should abort both of them and mark this vertex to failed state. was (Author: zjffdu): Regarding the partial output, After second thought, I think we should only consider it as vertex basis rather than output basis. That means either all vertex's outputs commits successfully or abort all. I think the purpose of TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS to allow external system to check the intermediate vertex's output. Say if one vertex has 2 outputs and TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false, and the first commit succeeded but the second commit fails, then we should abort both of them and mark this vertex to failed state. > OutputCommitters should not run in the main AM dispatcher thread > > > Key: TEZ-714 > URL: https://issues.apache.org/jira/browse/TEZ-714 > Project: Apache Tez > Issue Type: Improvement >Reporter: Siddharth Seth >Assignee: Jeff Zhang >Priority: Critical > Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, > TEZ-714-3.patch, TEZ-714-4.patch, TEZ-714-5.patch, Vertex_2.pdf > > > Follow up jira from TEZ-41. > 1) If there's multiple OutputCommitters on a Vertex, they can be run in > parallel. > 2) Running an OutputCommitter in the main thread blocks all other event > handling, w.r.t the DAG, and causes the event queue to back up. > 3) This should also cover shared commits that happen in the DAG. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386086#comment-14386086 ] Jeff Zhang edited comment on TEZ-714 at 3/30/15 2:28 AM: - bq. This is a known issue. There is a comment somewhere in the code but I dont think its tracked by a jira. Create TEZ-2249 to track it. bq. After the first if() check shouldn't the code simply return vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()? The first if() means that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the commit has not been started. In this case call commitOrFinished which would call async commit if there's any committer for this vertex or just finish if no committer. The second if() means either TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true or all the committers are completed successfully. May need to refactor code here to make it more clear bq. Why is checkForCompletion called after tasks finished and commit finished? Those 2 sounds like different logical steps and should use separate methods. Perhaps a preCommit() and postCommit() method. If there is nothing to commit then both can be called back to back. Both task finishing and commit finishing may be the last step to the finished state (if no commit then task finishing is the last step, and committing finish is the last step if there's commit), so they would both trigger checkForCompletion. bq. Optimistic case: Ignore failed commits and allow all commits to complete (with success or fail). If all commits succeed then proceed to succeeded. If any commit fails, then proceed to failed. What i am doing is not exactly this way. I won't ignore failed commits, stead I will cancel pending commits and wait for them to complete and then move to failed state. This behavior is consistent with other cases when there's any fail event happens (commit fail, vertex termination event and etc ), all the cases would cancel pending commits and wait for them to complete and then move to finished state. bq. For the optimistic case, all failures should go into the Terminating state. Errors should not cancel commits. Why Errors should not cancel commits ? Just because Errors don't have effort on the data to be committed ? bq. E.g. TaskCompletedAfterVertexSuccessTransition is clearing all successful commits and choosing the pessimistic case. TaskCompletedAfterVertexSuccessTransition is aborting all pending commits. So the logic does not seem to be consistent everywhere. We need to make this consistent. Either choose the pessimistic case or choose the optimistic case. bq. For the abort method itself, it could take a boolean parameter on whether to abort all or not instead of looking at the successful completions map. This avoids side-effect code like having to clear the successful maps before invoking abortVertex(). I think I made a mistake. It is not necessary to clear the successful maps. All the successful commits should not been aborted no matter what case it is. was (Author: zjffdu): bq. This is a known issue. There is a comment somewhere in the code but I dont think its tracked by a jira. Create TEZ-2249 to track it. bq. After the first if() check shouldn't the code simply return vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()? The first if() means that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the commit has not been started. In this case call commitOrFinished which would call async commit if there's any committer for this vertex or just finish if no committer. The second if() means either TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true or all the committers are completed successfully. May need to refactor code here to make it more clear bq. Why is checkForCompletion called after tasks finished and commit finished? Those 2 sounds like different logical steps and should use separate methods. Perhaps a preCommit() and postCommit() method. If there is nothing to commit then both can be called back to back. Both task finishing and commit finishing may be the last step to the finished state (if no commit then task finishing is the last step, and committing finish is the last step if there's commit), so they would both trigger checkForCompletion. bq. Optimistic case: Ignore failed commits and allow all commits to complete (with success or fail). If all commits succeed then proceed to succeeded. If any commit fails, then proceed to failed. What i am doing is not exactly this way. I won't ignore failed commits, stead I will cancel pending commits and wait for them to complete and then move to failed state. This behavior is consistent with other cases when there's any fail event happens (commit fail, vertex termination event and etc ), all the cases would cancel pending commits and wait for them to complete and then move to finished s
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386086#comment-14386086 ] Jeff Zhang edited comment on TEZ-714 at 3/30/15 2:15 AM: - bq. This is a known issue. There is a comment somewhere in the code but I dont think its tracked by a jira. Create TEZ-2249 to track it. bq. After the first if() check shouldn't the code simply return vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()? The first if() means that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the commit has not been started. In this case call commitOrFinished which would call async commit if there's any committer for this vertex or just finish if no committer. The second if() means either TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true or all the committers are completed successfully. May need to refactor code here to make it more clear bq. Why is checkForCompletion called after tasks finished and commit finished? Those 2 sounds like different logical steps and should use separate methods. Perhaps a preCommit() and postCommit() method. If there is nothing to commit then both can be called back to back. Both task finishing and commit finishing may be the last step to the finished state (if no commit then task finishing is the last step, and committing finish is the last step if there's commit), so they would both trigger checkForCompletion. bq. Optimistic case: Ignore failed commits and allow all commits to complete (with success or fail). If all commits succeed then proceed to succeeded. If any commit fails, then proceed to failed. What i am doing is not exactly this way. I won't ignore failed commits, stead I will cancel pending commits and wait for them to complete and then move to failed state. This behavior is consistent with other cases when there's any fail event happens (commit fail, vertex termination event and etc ), all the cases would cancel pending commits and wait for them to complete and then move to finished state. bq. For the optimistic case, all failures should go into the Terminating state. Errors should not cancel commits. Why Errors should not cancel commits ? Just because Errors don't have effort on the data to be committed ? bq. E.g. TaskCompletedAfterVertexSuccessTransition is clearing all successful commits and choosing the pessimistic case. TaskCompletedAfterVertexSuccessTransition is aborting all pending commits. So the logic does not seem to be consistent everywhere. We need to make this consistent. Either choose the pessimistic case or choose the optimistic case. bq. For the abort method itself, it could take a boolean parameter on whether to abort all or not instead of looking at the successful completions map. This avoids side-effect code like having to clear the successful maps before invoking abortVertex(). I think I made a mistake. It is not necessary to clear the successful maps. All the successful committer should not been aborted no matter what case it is. was (Author: zjffdu): bq. This is a known issue. There is a comment somewhere in the code but I dont think its tracked by a jira. Create TEZ-2249 to track it. bq. After the first if() check shouldn't the code simply return vertex.finished(SUCCEEDED) ? Why check again for commitFutures.isEmpty()? The first if() means that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the commit has not been started. In this case call commitOrFinished which would call async commit if there's any committer for this vertex or just finish if no committer. The second if() means either TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true or all the committers are completed successfully. May need to refactor code here to make it more clear bq. Why is checkForCompletion called after tasks finished and commit finished? Those 2 sounds like different logical steps and should use separate methods. Perhaps a preCommit() and postCommit() method. If there is nothing to commit then both can be called back to back. Both task finishing and commit finishing may be the last step to the finished state (if no commit then task finishing is the last step, and committing finish is the last step if there's commit), so they would both trigger checkForCompletion. bq. Optimistic case: Ignore failed commits and allow all commits to complete (with success or fail). If all commits succeed then proceed to succeeded. If any commit fails, then proceed to failed. What i am doing is not exactly this way. I won't ignore failed commits, stead I will cancel pending commits and wait for them to complete and then move to failed state. This behavior is consistent with other cases when there's any fail event happens (commit fail, vertex termination event and etc ), all the cases would cancel pending commits and wait for them to complete and then move to finished
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14383586#comment-14383586 ] Jeff Zhang edited comment on TEZ-714 at 3/27/15 9:31 AM: - Upload new patch to address the review comment. * Why is this calling Vertex.abortVertex() instead of directly calling committer.abort() for the outputs? Several reasons. ** reuse the abort code of vertex ** there's one bug in the existing code that it would abort the committed output when dag is failed in the case of TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false, but it is supposed not to abort in this case. * Both for DAG & Vertex wrap all the commit related code into one CommitCompletedTransition * bq. Should we wait for all outstanding commit operations to get cancelled or complete and then call abort on all outputs? It is done in the new patch except when internal error happens. Still use the original InternaErrorTransition ** For Vertex, when commit completed event and task completed event happens, it would call checkVertexForCompletion ** For DAG, when commit completed event and vertex completed event happens, it would call checkDAGForCompletion * Add unit test in TestCommit & e2e test in TestMockDAGAppMaster * Maybe an issue: when vertex's output is being committing as an vertex group output, then what state should this vertex in ? Currently vertex will go to SUCCEEDED state, but may be better to move to COMMITTING state * Should Task wait for all the task attempts completed before move to SUCCEEDED, otherwise it is possible that vertex is in COMMITTING while there still task attempt is still running in the case of speculation. was (Author: zjffdu): Upload new patch to address the review comment. * Why is this calling Vertex.abortVertex() instead of directly calling committer.abort() for the outputs? Several reasons. ** reuse the abort code of vertex ** there's one bug in the existing code that it would abort the committed output when dag is failed, but it is supposed not to abort in this case. * Both for DAG & Vertex wrap all the commit related code into one CommitCompletedTransition * bq. Should we wait for all outstanding commit operations to get cancelled or complete and then call abort on all outputs? It is done in the new patch except when internal error happens. Still use the original InternaErrorTransition ** For Vertex, when commit completed event and task completed event happens, it would call checkVertexForCompletion ** For DAG, when commit completed event and vertex completed event happens, it would call checkDAGForCompletion * Add unit test in TestCommit & e2e test in TestMockDAGAppMaster * Maybe an issue: when vertex's output is being committing as an vertex group output, then what state should this vertex in ? Currently vertex will go to SUCCEEDED state, but may be better to move to COMMITTING state * Should Task wait for all the task attempts completed before move to SUCCEEDED, otherwise it is possible that vertex is in COMMITTING while there still task attempt is still running in the case of speculation. > OutputCommitters should not run in the main AM dispatcher thread > > > Key: TEZ-714 > URL: https://issues.apache.org/jira/browse/TEZ-714 > Project: Apache Tez > Issue Type: Improvement >Reporter: Siddharth Seth >Assignee: Jeff Zhang >Priority: Critical > Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, > TEZ-714-3.patch, TEZ-714-4.patch, TEZ-714-5.patch, Vertex_2.pdf > > > Follow up jira from TEZ-41. > 1) If there's multiple OutputCommitters on a Vertex, they can be run in > parallel. > 2) Running an OutputCommitter in the main thread blocks all other event > handling, w.r.t the DAG, and causes the event queue to back up. > 3) This should also cover shared commits that happen in the DAG. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14379904#comment-14379904 ] Jeff Zhang edited comment on TEZ-714 at 3/25/15 3:04 PM: - Upload new patch to fix the test failed issue. was (Author: zjffdu): test failed, will check it > OutputCommitters should not run in the main AM dispatcher thread > > > Key: TEZ-714 > URL: https://issues.apache.org/jira/browse/TEZ-714 > Project: Apache Tez > Issue Type: Improvement >Reporter: Siddharth Seth >Assignee: Jeff Zhang >Priority: Critical > Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, > TEZ-714-3.patch, TEZ-714-4.patch, Vertex_2.pdf > > > Follow up jira from TEZ-41. > 1) If there's multiple OutputCommitters on a Vertex, they can be run in > parallel. > 2) Running an OutputCommitter in the main thread blocks all other event > handling, w.r.t the DAG, and causes the event queue to back up. > 3) This should also cover shared commits that happen in the DAG. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377401#comment-14377401 ] Bikas Saha edited comment on TEZ-714 at 3/24/15 6:54 AM: - bq. It could, but this may make the transition complicated. Currently we need to differentiate these 2 kinds of commits, besides there's 2 possible states (RUNNING, COMMITTING) when the commit happens and we also need check handle 2 different cases (commit succeeded & failure), so there would be totally 8 different cases in one transition which may be difficult to read. I am looking at TaskAttemptImpl#TerminatedBeforeRunningTransition state transitions as inspiration. There are some standard things to do when a commit operation completes. e.g. decrement the outstanding commit counter. If commit was a group commit then write the recovery entry for it. If the commit fails then set a flag to abort. This can be in a base transition say CommitCompletedTransition. Then we can have CommitCompletedWhileRunningTransition that calls the base for common code and does running specific stuff.e.g. trigger job failure upon commit failure. And another transition for CommitCompletedWhileCommitting that just waits for the commit counter to drop to 0. Next, CommitCompletedWhileTerminating which waits for all commit operations to complete and then calls abort (this could be blocking for now). This way we can separate things while still keeping the transitions essentially linear. Instead of multiplying the possibilities by (2 commit types x 3 states x 2 commit results) Perhaps, all commit events need to have a shared boolean that they should check before invoking commit. This boolean could be set to false when the vertex/dag decides to abort. This would make and pending commit operations complete quickly instead of trying to commit unnecessarily. Some e2e scenarios could be tested via simulation using the MockDAGAppMaster. Create custom committers that fail/pass as desired and check that the dag behaved as expected. was (Author: bikassaha): bq. It could, but this may make the transition complicated. Currently we need to differentiate these 2 kinds of commits, besides there's 2 possible states (RUNNING, COMMITTING) when the commit happens and we also need check handle 2 different cases (commit succeeded & failure), so there would be totally 8 different cases in one transition which may be difficult to read. I am looking at TaskAttemptImpl#TerminatedBeforeRunningTransition state transitions as inspiration. There are some standard things to do when a commit operation completes. e.g. decrement the outstanding commit counter. If commit was a group commit then write the recovery entry for it. If the commit fails then set a flag to abort. This can be in a base transition say CommitCompletedTransition. Then we can have CommitCompletedWhileRunningTransition that calls the base for common code and does running specific stuff.e.g. trigger job failure upon commit failure. And another transition for CommitCompletedWhileCommitting that just waits for the commit counter to drop to 0. Next, CommitCompletedWhileTerminating which waits for all commit operations to complete and then calls abort (this could be blocking for now). Perhaps, all commit events need to have a shared boolean that they should check before invoking commit. This boolean could be set to false when the vertex/dag decides to abort. This would make and pending commit operations complete quickly instead of trying to commit unnecessarily. Some e2e scenarios could be tested via simulation using the MockDAGAppMaster. Create custom committers that fail/pass as desired and check that the dag behaved as expected. > OutputCommitters should not run in the main AM dispatcher thread > > > Key: TEZ-714 > URL: https://issues.apache.org/jira/browse/TEZ-714 > Project: Apache Tez > Issue Type: Improvement >Reporter: Siddharth Seth >Assignee: Jeff Zhang >Priority: Critical > Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, Vertex_2.pdf > > > Follow up jira from TEZ-41. > 1) If there's multiple OutputCommitters on a Vertex, they can be run in > parallel. > 2) Running an OutputCommitter in the main thread blocks all other event > handling, w.r.t the DAG, and causes the event queue to back up. > 3) This should also cover shared commits that happen in the DAG. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377105#comment-14377105 ] Bikas Saha edited comment on TEZ-714 at 3/24/15 2:05 AM: - Not seen the patch yet because it may change if you agree with these comments bq. Regarding the issue of "not sure why group-commit and non-group commit need to be differentiated in different transitions. Can this be fixed by having the events for both be different? But still handled in the same transition. The transition can check if its a group commit event vs normal commit event (based on event type) - and then log for group commit. Maybe group commit event can derive from normal commit event. IMO, having less transitions makes the code much simpler. Is this recovery log written relevant only in the non-commit-at-end case where group commits can happen before the DAG finishes? bq. Unit test is still not perfect. Because currently in the DAGImpl/VertexImpl we run the shared thread pool in the AsynDispatcher For these tests we could choose to use the normal thread pool by overriding the setup. Since this is a new test, it can try to not depend on ordering like the existing tests do. If so, then it should be fine to use the real threadpool instead of the fake thread pool that delegates to the dispatcher. Maybe you can create a new TestCommit that starts from scratch without the hacks in TestVertexImpl. bq. For the some existing transition, like (RUNNING to ERROR due to INTERNAL ERROR) Is this for VertexImpl or DAGImpl? That sounds like a bug. Is that relevant to the commit operation though? was (Author: bikassaha): Not seen the patch yet because it may change if you agree with these comments bq. Regarding the issue of "not sure why group-commit and non-group commit need to be differentiated in different transitions. Can this be fixed by having the events for both be different? But still handled in the same transition. The transition can check if its a group commit event vs normal commit event (based on event type) - and then log for group commit. Maybe group commit event can derive from normal commit event. Is this recovery log written relevant only in the non-commit-at-end case where group commits can happen before the DAG finishes? bq. Unit test is still not perfect. Because currently in the DAGImpl/VertexImpl we run the shared thread pool in the AsynDispatcher For these tests we could choose to use the normal thread pool by overriding the setup. Since this is a new test, it can try to not depend on ordering like the existing tests do. If so, then it should be fine to use the real threadpool instead of the fake thread pool that delegates to the dispatcher. Maybe you can create a new TestCommit that starts from scratch without the hacks in TestVertexImpl. bq. For the some existing transition, like (RUNNING to ERROR due to INTERNAL ERROR) Is this for VertexImpl or DAGImpl? That sounds like a bug. Is that relevant to the commit operation though? > OutputCommitters should not run in the main AM dispatcher thread > > > Key: TEZ-714 > URL: https://issues.apache.org/jira/browse/TEZ-714 > Project: Apache Tez > Issue Type: Improvement >Reporter: Siddharth Seth >Assignee: Jeff Zhang >Priority: Critical > Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, Vertex_2.pdf > > > Follow up jira from TEZ-41. > 1) If there's multiple OutputCommitters on a Vertex, they can be run in > parallel. > 2) Running an OutputCommitter in the main thread blocks all other event > handling, w.r.t the DAG, and causes the event queue to back up. > 3) This should also cover shared commits that happen in the DAG. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (TEZ-714) OutputCommitters should not run in the main AM dispatcher thread
[ https://issues.apache.org/jira/browse/TEZ-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14375512#comment-14375512 ] Jeff Zhang edited comment on TEZ-714 at 3/23/15 8:04 AM: - Upload a new patch. [~bikassaha] Please help review it. * Wrap the commit in the CallableEvent both in DAG & Vertex, but for the abort, still call it inline. Make the abort asyn will complicate the patch, so still keep it a sync call as before. * Introduce new state COMMITTING for Vertex & DAG ** Vertex's COMMITTING means vertex is in the middle of committing, if vertex has no committers or the option of TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true, vertex would not to to COMMITTING state. ** DAG's COMMITTING has 2 cases, one is when TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true and all the vertices are completed, another case is that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and all the vertices are completed, but still some vertex group committers are running. * Regarding the issue of "not sure why group-commit and non-group commit need to be differentiated in different transitions.", I rename it to NonFinalCommitCompletedTransition and FinalCommitCompletetionTransition (maybe there's better names ). One mean the committer when TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the other means TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true. The reason I differentiate them is that for the NonFinalCommitCompletedEvent, we need to log the recovery log of VertexGroupCommitCompletedEvent while it is not necessary for FinalCommitCompletedEvent. * Unit test is still not perfect. Because currently in the DAGImpl/VertexImpl we run the shared thread pool in the AsynDispatcher thread ( that means Committer still run in the thread of AsynDispather) so this may hide some potential issues and under this thread mode, it is not possible for test some cases like kill dag while it is in committing. I am trying to think of ways to simulate the shared thread pool in the unit test. * For the some existing transition, like (RUNNING to ERROR due to INTERNAL ERROR), I am not sure why it go to ERROR directly rather than TERMINATING. Maybe it is to allow the client get the final status as early as possible. was (Author: zjffdu): Upload a new patch. [~bikassaha] Please help review it. * Wrap the commit in the CallableEvent both in DAG & Vertex, but for the abort, still call it inline. Make the abort asyn will complicate the patch, so still keep it a sync call as before. * Introduce new state COMMITTING for Vertex & DAG ** Vertex's COMMITTING means vertex is in the middle of committing, if vertex has no committers or the option of TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true, vertex would not to to COMMITTING state. ** DAG's COMMITTING has 2 cases, one is when TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true and all the vertices are completed, another case is that TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and all the vertices are completed, but still some vertex group committers are running. * Regarding the issue of "not sure why group-commit and non-group commit need to be differentiated in different transitions.", I rename it to NonFinalCommitCompletedTransition and FinalCommitCompletetionTransition (maybe there's better names ). One mean the committer when TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false and the other means TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true. The reason I differentiate them is that for the NonFinalCommitCompletedEvent, we need to log the recovery log of VertexGroupCommitCompletedEvent while it is not necessary for FinalCommitCompletedEvent. * Unit test is still not perfect. Because currently in the DAGImpl/VertexImpl we run the shared thread pool in the AsynDispatcher thread ( that means Committer still run in the thread of AsynDispather) so this may hide some potential issues and under this thread mode, it is not possible for test some cases like kill dag while it is in committing. I am trying to think of ways to simulate the shared thread pool in the unit test. * For the some existing transition, like (RUNNING to ERROR due to INTERNAL ERROR), I am not sure why it go to ERROR directly rather than TERMINATING. Maybe it is to allow the client get the final status as earyl as possible. > OutputCommitters should not run in the main AM dispatcher thread > > > Key: TEZ-714 > URL: https://issues.apache.org/jira/browse/TEZ-714 > Project: Apache Tez > Issue Type: Improvement >Reporter: Siddharth Seth >Assignee: Jeff Zhang >Priority: Critical > Attachments: DAG_2.pdf, TEZ-714-1.patch, TEZ-714-2.patch, Vertex_2.pdf > > > Follow up jira from TEZ-41. > 1) If there's multiple OutputCom