Repository: eagle Updated Branches: refs/heads/master 73d03b9e5 -> 3027e5fbc
[MINOR] remove useless code and fix fetch running job config time out Author: wujinhu <[email protected]> Closes #809 from wujinhu/EAGLE-844. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/3027e5fb Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/3027e5fb Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/3027e5fb Branch: refs/heads/master Commit: 3027e5fbc7d30f938371c17caddb3b2d91ad5b54 Parents: 73d03b9 Author: wujinhu <[email protected]> Authored: Mon Feb 20 11:23:09 2017 +0800 Committer: wujinhu <[email protected]> Committed: Mon Feb 20 11:23:09 2017 +0800 ---------------------------------------------------------------------- .../jpm/mr/running/MRRunningJobApplication.java | 3 +-- .../eagle/jpm/mr/running/parser/MRJobParser.java | 15 ++++++--------- .../jpm/mr/running/storm/MRRunningJobParseBolt.java | 10 ++-------- .../jpm/mr/running/MRRunningJobApplicationTest.java | 3 +-- .../jpm/mr/running/parser/MRJobParserTest.java | 16 ++++++++-------- 5 files changed, 18 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java index 309146e..7b1e2fb 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java @@ -68,8 +68,7 @@ public class MRRunningJobApplication extends StormApplication { mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getZkStateConfig(), confKeyKeys, - config, - new MRJobPerformanceAnalyzer(config)), + config), tasks).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId")); return topologyBuilder.createTopology(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index 525ffc2..0f2ede6 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -77,14 +77,12 @@ public class MRJobParser implements Runnable { private Map<String, String> commonTags = new HashMap<>(); private MRRunningJobManager runningJobManager; private ParserStatus parserStatus; - private ResourceFetcher rmResourceFetcher; private Set<String> finishedTaskIds; private List<String> configKeys; private static final int TOP_BOTTOM_TASKS_BY_ELAPSED_TIME = 10; private static final int FLUSH_TASKS_EVERY_TIME = 5; private static final int MAX_TASKS_PERMIT = 5000; private Config config; - private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer; static { OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); @@ -93,10 +91,9 @@ public class MRJobParser implements Runnable { public MRJobParser(MRRunningJobConfig.EndpointConfig endpointConfig, MRRunningJobConfig.EagleServiceConfig eagleServiceConfig, AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap, - MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher, + MRRunningJobManager runningJobManager, List<String> configKeys, - Config config, - MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) { + Config config) { this.app = app; if (mrJobMap == null) { this.mrJobEntityMap = new HashMap<>(); @@ -112,11 +109,9 @@ public class MRJobParser implements Runnable { this.commonTags.put(MRJobTagName.JOB_QUEUE.toString(), app.getQueue()); this.runningJobManager = runningJobManager; this.parserStatus = ParserStatus.FINISHED; - this.rmResourceFetcher = rmResourceFetcher; this.finishedTaskIds = new HashSet<>(); this.configKeys = configKeys; this.config = config; - this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer; } public void setAppInfo(AppInfo app) { @@ -175,7 +170,6 @@ public class MRJobParser implements Runnable { break; } } - mrJobPerformanceAnalyzer.analyze(convertToAnalysisEntity(mrJobEntityMap.get(jobId))); } } @@ -572,7 +566,10 @@ public class MRJobParser implements Runnable { LOG.warn("exception found when process application {}, {}", app.getId(), e); } finally { for (String jobId : mrJobEntityMap.keySet()) { - mrJobEntityCreationHandler.add(mrJobEntityMap.get(jobId)); + JobExecutionAPIEntity entity = mrJobEntityMap.get(jobId); + if (entity.getTags().containsKey(MRJobTagName.JOB_TYPE.toString())) { + mrJobEntityCreationHandler.add(entity); + } } if (mrJobEntityCreationHandler.flush()) { //force flush //we must flush entities before delete from zk in case of missing finish state of jobs http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java index 915df8a..a8db603 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java @@ -49,24 +49,20 @@ public class MRRunningJobParseBolt extends BaseRichBolt { private Map<String, MRJobParser> runningMRParsers; private transient MRRunningJobManager runningJobManager; private MRRunningJobConfig.EagleServiceConfig eagleServiceConfig; - private ResourceFetcher resourceFetcher; private List<String> configKeys; private Config config; - private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer; public MRRunningJobParseBolt(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig, MRRunningJobConfig.EndpointConfig endpointConfig, MRRunningJobConfig.ZKStateConfig zkStateConfig, List<String> configKeys, - Config config, - MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) { + Config config) { this.eagleServiceConfig = eagleServiceConfig; this.endpointConfig = endpointConfig; this.runningMRParsers = new HashMap<>(); this.zkStateConfig = zkStateConfig; this.configKeys = configKeys; this.config = config; - this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer; } @Override @@ -74,7 +70,6 @@ public class MRRunningJobParseBolt extends BaseRichBolt { this.executorService = Executors.newFixedThreadPool(endpointConfig.parseJobThreadPoolSize); this.runningJobManager = new MRRunningJobManager(zkStateConfig); - this.resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls); } @Override @@ -87,8 +82,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt { MRJobParser applicationParser; if (!runningMRParsers.containsKey(appInfo.getId())) { applicationParser = new MRJobParser(endpointConfig, eagleServiceConfig, - appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys, this.config, - mrJobPerformanceAnalyzer); + appInfo, mrJobs, runningJobManager, configKeys, this.config); runningMRParsers.put(appInfo.getId(), applicationParser); LOG.info("create application parser for {}", appInfo.getId()); } else { http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java index f4bd2fa..5ebd9c5 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java @@ -89,8 +89,7 @@ public class MRRunningJobApplicationTest { mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getZkStateConfig(), confKeyKeys, - config, - new MRJobPerformanceAnalyzer(config)); + config); MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class); PowerMockito.whenNew(MRRunningJobManager.class).withArguments(mrRunningJobConfig.getZkStateConfig()).thenReturn(mrRunningJobManager); mrRunningJobParseBolt.prepare(null, null, null); http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java index a4748ac..227eecb 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java @@ -132,7 +132,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); + app1, mrJobs, runningJobManager, confKeyKeys, config); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -187,7 +187,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); + app1, mrJobs, runningJobManager, confKeyKeys, config); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -229,7 +229,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); + app1, mrJobs, runningJobManager, confKeyKeys, config); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -248,7 +248,7 @@ public class MRJobParserTest { Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null); Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null); Assert.assertTrue(entities.isEmpty()); - verify(client, times(1)).create(any()); + verify(client, times(0)).create(any()); } @@ -273,7 +273,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); + app1, mrJobs, runningJobManager, confKeyKeys, config); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -319,7 +319,7 @@ public class MRJobParserTest { RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class); when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList()); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); + app1, mrJobs, runningJobManager, confKeyKeys, config); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -342,7 +342,7 @@ public class MRJobParserTest { Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null); Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null); Assert.assertTrue(entities.isEmpty()); - verify(client, times(1)).create(any()); + verify(client, times(0)).create(any()); } @@ -378,7 +378,7 @@ public class MRJobParserTest { RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class); when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList()); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); + app1, mrJobs, runningJobManager, confKeyKeys, config); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
