Repository: eagle Updated Branches: refs/heads/master 95a506b44 -> 0cda01b58
MINOR: fix bugs in HadoopYarnResourceUtils Author: Zhao, Qingwen <[email protected]> Closes #888 from qingwen220/minor. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/0cda01b5 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/0cda01b5 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/0cda01b5 Branch: refs/heads/master Commit: 0cda01b58f4a4bb06a1ab71bae597fdc4f7c39cf Parents: 95a506b Author: Zhao, Qingwen <[email protected]> Authored: Mon Mar 20 20:22:01 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Mon Mar 20 20:22:01 2017 +0800 ---------------------------------------------------------------------- .../queue/common/HadoopYarnResourceUtils.java | 12 ++- .../queue/crawler/ClusterMetricsCrawler.java | 2 +- .../queue/crawler/RunningAppsCrawler.java | 2 +- .../queue/crawler/SchedulerInfoCrawler.java | 2 +- .../queue/TestHadoopYarnResourceUtils.java | 6 +- .../util/resourcefetch/RMResourceFetcher.java | 104 ++++++++----------- .../resourcefetch/ha/HAURLSelectorImpl.java | 2 + 7 files changed, 63 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java index 59bd535..b7c3a21 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java @@ -34,11 +34,11 @@ public class HadoopYarnResourceUtils { OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); } - public static Object getObjectFromStreamWithGzip(String urlString, Class<?> clazz) throws Exception { + public static Object getObjectFromUrlStream(String urlString, Class<?> clazz) throws Exception { InputStream is = null; Object o = null; try { - is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP); + is = readStream(urlString); o = OBJ_MAPPER.readValue(is, clazz); } catch (Exception e) { throw new IllegalArgumentException(String.format("Fetch resource %s failed", urlString), e); @@ -50,6 +50,14 @@ public class HadoopYarnResourceUtils { return o; } + private static InputStream readStream(String urlString) throws Exception { + try { + return InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP); + } catch (java.util.zip.ZipException ex) { + return InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.NONE); + } + } + public static String getConfigValue(Config eagleConf, String key, String defaultValue) { if (eagleConf.hasPath(key)) { return eagleConf.getString(key); http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java index ac2f0f5..bde8bcd 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java @@ -43,7 +43,7 @@ public class ClusterMetricsCrawler implements Runnable { public void run() { try { logger.info("Start to crawl cluster metrics from " + this.urlString); - ClusterMetricsWrapper metricsWrapper = (ClusterMetricsWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, ClusterMetricsWrapper.class); + ClusterMetricsWrapper metricsWrapper = (ClusterMetricsWrapper) HadoopYarnResourceUtils.getObjectFromUrlStream(urlString, ClusterMetricsWrapper.class); ClusterMetrics metrics = metricsWrapper.getClusterMetrics(); if (metrics == null) { logger.error("Failed to crawl cluster metrics"); http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java index 6629408..3dac4e5 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java @@ -47,7 +47,7 @@ public class RunningAppsCrawler implements Runnable { public void run() { try { logger.info("Start to crawl app metrics from " + this.urlString); - AppsWrapper appsWrapper = (AppsWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, AppsWrapper.class); + AppsWrapper appsWrapper = (AppsWrapper) HadoopYarnResourceUtils.getObjectFromUrlStream(urlString, AppsWrapper.class); if (appsWrapper == null || appsWrapper.getApps() == null) { logger.error("Failed to crawl running applications with api = " + urlString); } else { http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java index 16e644a..70a539b 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java @@ -45,7 +45,7 @@ public class SchedulerInfoCrawler implements Runnable { try { //https://some.server.address:50030/ws/v1/cluster/scheduler?anonymous=true logger.info("Start to crawl cluster scheduler queues from " + this.urlString); - SchedulerWrapper schedulerWrapper = (SchedulerWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, SchedulerWrapper.class); + SchedulerWrapper schedulerWrapper = (SchedulerWrapper) HadoopYarnResourceUtils.getObjectFromUrlStream(urlString, SchedulerWrapper.class); if (schedulerWrapper == null || schedulerWrapper.getScheduler() == null) { logger.error("Failed to crawl scheduler info with url = " + this.urlString); } else { http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/TestHadoopYarnResourceUtils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/TestHadoopYarnResourceUtils.java b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/TestHadoopYarnResourceUtils.java index 06af0f3..b299248 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/TestHadoopYarnResourceUtils.java +++ b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/TestHadoopYarnResourceUtils.java @@ -41,9 +41,9 @@ public class TestHadoopYarnResourceUtils { String schedulerUrl = YarnClusterResourceURLBuilder.buildSchedulerInfoURL(baseUrl); try { - ClusterMetricsWrapper clusterMetrics = (ClusterMetricsWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(clusterMetricUrl, ClusterMetricsWrapper.class); - AppsWrapper appsWrapper = (AppsWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(finishedAppsUrl, AppsWrapper.class); - SchedulerWrapper schedulerWrapper = (SchedulerWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(schedulerUrl, SchedulerWrapper.class); + ClusterMetricsWrapper clusterMetrics = (ClusterMetricsWrapper) HadoopYarnResourceUtils.getObjectFromUrlStream(clusterMetricUrl, ClusterMetricsWrapper.class); + AppsWrapper appsWrapper = (AppsWrapper) HadoopYarnResourceUtils.getObjectFromUrlStream(finishedAppsUrl, AppsWrapper.class); + SchedulerWrapper schedulerWrapper = (SchedulerWrapper) HadoopYarnResourceUtils.getObjectFromUrlStream(schedulerUrl, SchedulerWrapper.class); Assert.assertTrue(appsWrapper != null); Assert.assertTrue(clusterMetrics != null); Assert.assertTrue(schedulerWrapper != null); http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java index 5b4e39a..ab641d9 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java @@ -19,7 +19,6 @@ */ package org.apache.eagle.jpm.util.resourcefetch; -import com.fasterxml.jackson.databind.util.ContainerBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.eagle.common.DateTimeUtil; import org.apache.eagle.jpm.util.Constants; @@ -65,7 +64,7 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { return selector; } - private List<AppInfo> doFetchApplicationsList(String urlString, Constants.CompressionType compressionType) { + private List<AppInfo> doFetchApplicationsList(String urlString, Constants.CompressionType compressionType) throws Exception { List<AppInfo> result = new ArrayList<>(); InputStream is = null; try { @@ -77,8 +76,6 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { result = appWrapper.getApps().getApp(); } LOG.info("Successfully fetched {} AppInfos from {}", result.size(), urlString); - } catch (Exception e) { - LOG.error("Fail to query {} due to {}", urlString, e.getMessage()); } finally { if (is != null) { try { @@ -135,75 +132,64 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { private List<AppInfo> doFetchRunningApplicationsList(Constants.JobType jobType, Constants.CompressionType compressionType, Object... parameter) throws Exception { - Map<String, AppInfo> result = new HashMap(); - List<AppInfo> apps = new ArrayList<>(); - try { - String limit = ""; - int requests = 1; - int timeRangePerRequestInMin = 60; - - switch (parameter.length) { - case 0 : - String urlString = getRunningJobURL(jobType, null, null, null); - return doFetchApplicationsList(urlString, compressionType); - case 1 : - limit = String.valueOf(parameter[0]); - break; - case 2 : - limit = String.valueOf(parameter[0]); - requests = (int) parameter[1]; - break; - case 3 : - limit = String.valueOf(parameter[0]); - requests = (int) parameter[1]; - timeRangePerRequestInMin = (int) parameter[2]; - break; - default : - throw new InvalidParameterException("parameter list: limit, requests, requestTimeRange"); - } + String limit = ""; + int requests = 1; + int timeRangePerRequestInMin = 60; - if (requests <= 1) { - String urlString = getRunningJobURL(jobType, null, null, limit); + switch (parameter.length) { + case 0 : + String urlString = getRunningJobURL(jobType, null, null, null); return doFetchApplicationsList(urlString, compressionType); - } + case 1 : + limit = String.valueOf(parameter[0]); + break; + case 2 : + limit = String.valueOf(parameter[0]); + requests = (int) parameter[1]; + break; + case 3 : + limit = String.valueOf(parameter[0]); + requests = (int) parameter[1]; + timeRangePerRequestInMin = (int) parameter[2]; + break; + default : + throw new InvalidParameterException("parameter list: limit, requests, requestTimeRange"); + } + + if (requests <= 1) { + String urlString = getRunningJobURL(jobType, null, null, limit); + return doFetchApplicationsList(urlString, compressionType); + } - long interval = timeRangePerRequestInMin * DateTimeUtil.ONEMINUTE; - long currentTime = System.currentTimeMillis() - interval; + long interval = timeRangePerRequestInMin * DateTimeUtil.ONEMINUTE; + long currentTime = System.currentTimeMillis() - interval; - List<String> requestUrls = new ArrayList<>(); - requestUrls.add(getRunningJobURL(jobType, String.valueOf(currentTime), null, limit)); + List<String> requestUrls = new ArrayList<>(); + requestUrls.add(getRunningJobURL(jobType, String.valueOf(currentTime), null, limit)); - for (int cnt = 2; cnt < requests; cnt++) { - long start = currentTime - interval; - requestUrls.add(getRunningJobURL(jobType, String.valueOf(start), String.valueOf(currentTime), limit)); - currentTime -= interval; - } + for (int cnt = 2; cnt < requests; cnt++) { + long start = currentTime - interval; + requestUrls.add(getRunningJobURL(jobType, String.valueOf(start), String.valueOf(currentTime), limit)); + currentTime -= interval; + } - requestUrls.add(getRunningJobURL(jobType, null, String.valueOf(currentTime), limit)); - LOG.info("{} requests to fetch running MapReduce applications: \n{}", requestUrls.size(), - StringUtils.join(requestUrls, "\n")); + requestUrls.add(getRunningJobURL(jobType, null, String.valueOf(currentTime), limit)); + LOG.info("{} requests to fetch running MapReduce applications: \n{}", requestUrls.size(), + StringUtils.join(requestUrls, "\n")); - requestUrls.forEach(query -> - doFetchApplicationsList(query, compressionType).forEach(app -> result.put(app.getId(), app)) - ); - } catch (Exception e) { - LOG.error("Catch an exception when query url{} : {}", selector.getSelectedUrl(), e.getMessage(), e); - return apps; + Map<String, AppInfo> result = new HashMap(); + for (String query : requestUrls) { + doFetchApplicationsList(query, compressionType).forEach(app -> result.put(app.getId(), app)); } + List<AppInfo> apps = new ArrayList<>(); apps.addAll(result.values()); return apps; } private List<AppInfo> doFetchAcceptedApplicationList(Constants.CompressionType compressionType, Object... parameter) throws Exception { - List<AppInfo> apps = new ArrayList<>(); - try { - String url = getAcceptedAppURL(parameter); - return doFetchApplicationsList(url, compressionType); - } catch (Exception e) { - LOG.error("Catch an exception when query {} : {}", selector.getSelectedUrl(), e.getMessage(), e); - } - return apps; + String url = getAcceptedAppURL(parameter); + return doFetchApplicationsList(url, compressionType); } private List<AppInfo> getResource(Constants.ResourceType resourceType, Constants.CompressionType compressionType, Object... parameter) throws Exception { http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java index fff7a1b..1f49cc1 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java @@ -16,6 +16,7 @@ */ package org.apache.eagle.jpm.util.resourcefetch.ha; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils; import org.apache.eagle.jpm.util.resourcefetch.url.RmActiveTestURLBuilderImpl; @@ -38,6 +39,7 @@ public class HAURLSelectorImpl implements HAURLSelector { private final Constants.CompressionType compressionType; private static final long MAX_RETRY_TIME = 2; private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class); + private static ObjectMapper OBJ_MAPPER = new ObjectMapper(); public HAURLSelectorImpl(String[] urls, Constants.CompressionType compressionType) { this.urls = urls;
