Repository: eagle Updated Branches: refs/heads/master c9c475e2a -> 02d6cce73
[MINOR] optimize mr running job list api Author: wujinhu <[email protected]> Closes #768 from wujinhu/EAGLE-842. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/02d6cce7 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/02d6cce7 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/02d6cce7 Branch: refs/heads/master Commit: 02d6cce73679ce90fe610dbdb30fd1cbdd062897 Parents: c9c475e Author: wujinhu <[email protected]> Authored: Mon Jan 9 22:10:16 2017 +0800 Committer: wujinhu <[email protected]> Committed: Mon Jan 9 22:10:16 2017 +0800 ---------------------------------------------------------------------- .../eagle/jpm/mr/running/parser/MRJobParser.java | 11 ++++------- .../eagle/jpm/mr/running/parser/MRJobParserTest.java | 14 +++++++------- 2 files changed, 11 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/02d6cce7/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index d866c1c..52c1866 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -142,10 +142,9 @@ public class MRJobParser implements Runnable { if (fetchMRJobs()) { break; } else if (i >= MAX_RETRY_TIMES - 1) { - //check whether the app has finished. if we test that we can connect rm, then we consider the jobs have finished - //if we get here either because of cannot connect rm or the jobs have finished - rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB); - mrJobEntityMap.keySet().forEach(this::finishMRJob); + if (app.getState().equals(Constants.AppState.FINISHED.toString())) { + mrJobEntityMap.keySet().forEach(this::finishMRJob); + } return; } } @@ -166,9 +165,6 @@ public class MRJobParser implements Runnable { } } if (i >= MAX_RETRY_TIMES) { - //may caused by rm unreachable - rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB); - finishMRJob(jobId); break; } } @@ -575,6 +571,7 @@ public class MRJobParser implements Runnable { //delete from zk if needed mrJobEntityMap.keySet() .stream() + .filter(jobId -> mrJobEntityMap.get(jobId).getInternalState() != null) .filter( jobId -> mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FINISHED.toString()) || mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FAILED.toString())) http://git-wip-us.apache.org/repos/asf/eagle/blob/02d6cce7/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java index a2fb6ca..e0b5533 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java @@ -335,9 +335,9 @@ public class MRJobParserTest { Assert.assertTrue(jobIdToJobConfig.isEmpty()); Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1); JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID); - Assert.assertEquals(Constants.AppState.FINISHED.toString(), jobExecutionAPIEntity.getInternalState()); + Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getInternalState()); Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getCurrentState()); - Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.APP_FINISHED); + Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED); Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null); Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null); Assert.assertTrue(entities.isEmpty()); @@ -391,14 +391,14 @@ public class MRJobParserTest { mrJobParser.run(); - Assert.assertTrue(jobIdToJobConfig.isEmpty()); + Assert.assertTrue(!jobIdToJobConfig.isEmpty()); Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1); JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID); - Assert.assertEquals(Constants.AppState.FINISHED.toString(), jobExecutionAPIEntity.getInternalState()); + Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getInternalState()); Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getCurrentState()); - Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.APP_FINISHED); - Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null); - Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null); + Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED); + Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) != null); + Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null); Assert.assertTrue(entities.isEmpty()); verify(client, times(2)).create(any()); verify(client, times(1)).getJerseyClient();
