Repository: eagle
Updated Branches:
  refs/heads/master 95a506b44 -> 0cda01b58


MINOR: fix bugs in HadoopYarnResourceUtils

Author: Zhao, Qingwen <[email protected]>

Closes #888 from qingwen220/minor.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/0cda01b5
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/0cda01b5
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/0cda01b5

Branch: refs/heads/master
Commit: 0cda01b58f4a4bb06a1ab71bae597fdc4f7c39cf
Parents: 95a506b
Author: Zhao, Qingwen <[email protected]>
Authored: Mon Mar 20 20:22:01 2017 +0800
Committer: Zhao, Qingwen <[email protected]>
Committed: Mon Mar 20 20:22:01 2017 +0800

----------------------------------------------------------------------
 .../queue/common/HadoopYarnResourceUtils.java   |  12 ++-
 .../queue/crawler/ClusterMetricsCrawler.java    |   2 +-
 .../queue/crawler/RunningAppsCrawler.java       |   2 +-
 .../queue/crawler/SchedulerInfoCrawler.java     |   2 +-
 .../queue/TestHadoopYarnResourceUtils.java      |   6 +-
 .../util/resourcefetch/RMResourceFetcher.java   | 104 ++++++++-----------
 .../resourcefetch/ha/HAURLSelectorImpl.java     |   2 +
 7 files changed, 63 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
index 59bd535..b7c3a21 100644
--- 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
@@ -34,11 +34,11 @@ public class HadoopYarnResourceUtils {
         OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, 
true);
     }
 
-    public static Object getObjectFromStreamWithGzip(String urlString, 
Class<?> clazz) throws Exception {
+    public static Object getObjectFromUrlStream(String urlString, Class<?> 
clazz) throws Exception {
         InputStream is = null;
         Object o = null;
         try {
-            is = InputStreamUtils.getInputStream(urlString, null, 
Constants.CompressionType.GZIP);
+            is = readStream(urlString);
             o = OBJ_MAPPER.readValue(is, clazz);
         } catch (Exception e) {
             throw new IllegalArgumentException(String.format("Fetch resource 
%s failed", urlString), e);
@@ -50,6 +50,14 @@ public class HadoopYarnResourceUtils {
         return o;
     }
 
+    private static InputStream readStream(String urlString) throws Exception {
+        try {
+            return InputStreamUtils.getInputStream(urlString, null, 
Constants.CompressionType.GZIP);
+        } catch (java.util.zip.ZipException ex) {
+            return InputStreamUtils.getInputStream(urlString, null, 
Constants.CompressionType.NONE);
+        }
+    }
+
     public static String getConfigValue(Config eagleConf, String key, String 
defaultValue) {
         if (eagleConf.hasPath(key)) {
             return eagleConf.getString(key);

http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java
 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java
index ac2f0f5..bde8bcd 100644
--- 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java
@@ -43,7 +43,7 @@ public class ClusterMetricsCrawler implements Runnable {
     public void run() {
         try {
             logger.info("Start to crawl cluster metrics from " + 
this.urlString);
-            ClusterMetricsWrapper metricsWrapper = (ClusterMetricsWrapper) 
HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, 
ClusterMetricsWrapper.class);
+            ClusterMetricsWrapper metricsWrapper = (ClusterMetricsWrapper) 
HadoopYarnResourceUtils.getObjectFromUrlStream(urlString, 
ClusterMetricsWrapper.class);
             ClusterMetrics metrics = metricsWrapper.getClusterMetrics();
             if (metrics == null) {
                 logger.error("Failed to crawl cluster metrics");

http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
index 6629408..3dac4e5 100755
--- 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
@@ -47,7 +47,7 @@ public class RunningAppsCrawler implements Runnable {
     public void run() {
         try {
             logger.info("Start to crawl app metrics from " + this.urlString);
-            AppsWrapper appsWrapper = (AppsWrapper) 
HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, 
AppsWrapper.class);
+            AppsWrapper appsWrapper = (AppsWrapper) 
HadoopYarnResourceUtils.getObjectFromUrlStream(urlString, AppsWrapper.class);
             if (appsWrapper == null || appsWrapper.getApps() == null) {
                 logger.error("Failed to crawl running applications with api = 
" + urlString);
             } else {

http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java
 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java
index 16e644a..70a539b 100755
--- 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java
@@ -45,7 +45,7 @@ public class SchedulerInfoCrawler implements Runnable {
         try {
             
//https://some.server.address:50030/ws/v1/cluster/scheduler?anonymous=true
             logger.info("Start to crawl cluster scheduler queues from " + 
this.urlString);
-            SchedulerWrapper schedulerWrapper = (SchedulerWrapper) 
HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, 
SchedulerWrapper.class);
+            SchedulerWrapper schedulerWrapper = (SchedulerWrapper) 
HadoopYarnResourceUtils.getObjectFromUrlStream(urlString, 
SchedulerWrapper.class);
             if (schedulerWrapper == null || schedulerWrapper.getScheduler() == 
null) {
                 logger.error("Failed to crawl scheduler info with url = " + 
this.urlString);
             } else {

http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/TestHadoopYarnResourceUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/TestHadoopYarnResourceUtils.java
 
b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/TestHadoopYarnResourceUtils.java
index 06af0f3..b299248 100644
--- 
a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/TestHadoopYarnResourceUtils.java
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/TestHadoopYarnResourceUtils.java
@@ -41,9 +41,9 @@ public class TestHadoopYarnResourceUtils {
         String schedulerUrl = 
YarnClusterResourceURLBuilder.buildSchedulerInfoURL(baseUrl);
 
         try {
-            ClusterMetricsWrapper clusterMetrics = (ClusterMetricsWrapper) 
HadoopYarnResourceUtils.getObjectFromStreamWithGzip(clusterMetricUrl, 
ClusterMetricsWrapper.class);
-            AppsWrapper appsWrapper = (AppsWrapper) 
HadoopYarnResourceUtils.getObjectFromStreamWithGzip(finishedAppsUrl, 
AppsWrapper.class);
-            SchedulerWrapper schedulerWrapper = (SchedulerWrapper) 
HadoopYarnResourceUtils.getObjectFromStreamWithGzip(schedulerUrl, 
SchedulerWrapper.class);
+            ClusterMetricsWrapper clusterMetrics = (ClusterMetricsWrapper) 
HadoopYarnResourceUtils.getObjectFromUrlStream(clusterMetricUrl, 
ClusterMetricsWrapper.class);
+            AppsWrapper appsWrapper = (AppsWrapper) 
HadoopYarnResourceUtils.getObjectFromUrlStream(finishedAppsUrl, 
AppsWrapper.class);
+            SchedulerWrapper schedulerWrapper = (SchedulerWrapper) 
HadoopYarnResourceUtils.getObjectFromUrlStream(schedulerUrl, 
SchedulerWrapper.class);
             Assert.assertTrue(appsWrapper != null);
             Assert.assertTrue(clusterMetrics != null);
             Assert.assertTrue(schedulerWrapper != null);

http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
index 5b4e39a..ab641d9 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
@@ -19,7 +19,6 @@
  */
 package org.apache.eagle.jpm.util.resourcefetch;
 
-import com.fasterxml.jackson.databind.util.ContainerBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.jpm.util.Constants;
@@ -65,7 +64,7 @@ public class RMResourceFetcher implements 
ResourceFetcher<AppInfo> {
         return selector;
     }
 
-    private List<AppInfo> doFetchApplicationsList(String urlString, 
Constants.CompressionType compressionType) {
+    private List<AppInfo> doFetchApplicationsList(String urlString, 
Constants.CompressionType compressionType) throws Exception {
         List<AppInfo> result = new ArrayList<>();
         InputStream is = null;
         try {
@@ -77,8 +76,6 @@ public class RMResourceFetcher implements 
ResourceFetcher<AppInfo> {
                 result = appWrapper.getApps().getApp();
             }
             LOG.info("Successfully fetched {} AppInfos from {}", 
result.size(), urlString);
-        } catch (Exception e) {
-            LOG.error("Fail to query {} due to {}", urlString, e.getMessage());
         } finally {
             if (is != null) {
                 try {
@@ -135,75 +132,64 @@ public class RMResourceFetcher implements 
ResourceFetcher<AppInfo> {
     private List<AppInfo> doFetchRunningApplicationsList(Constants.JobType 
jobType,
                                                          
Constants.CompressionType compressionType,
                                                          Object... parameter) 
throws Exception {
-        Map<String, AppInfo> result = new HashMap();
-        List<AppInfo> apps = new ArrayList<>();
-        try {
-            String limit = "";
-            int requests = 1;
-            int timeRangePerRequestInMin = 60;
-
-            switch (parameter.length) {
-                case 0 :
-                    String urlString = getRunningJobURL(jobType, null, null, 
null);
-                    return doFetchApplicationsList(urlString, compressionType);
-                case 1 :
-                    limit = String.valueOf(parameter[0]);
-                    break;
-                case 2 :
-                    limit = String.valueOf(parameter[0]);
-                    requests = (int) parameter[1];
-                    break;
-                case 3 :
-                    limit = String.valueOf(parameter[0]);
-                    requests = (int) parameter[1];
-                    timeRangePerRequestInMin = (int) parameter[2];
-                    break;
-                default :
-                    throw new InvalidParameterException("parameter list: 
limit, requests, requestTimeRange");
-            }
+        String limit = "";
+        int requests = 1;
+        int timeRangePerRequestInMin = 60;
 
-            if (requests <= 1) {
-                String urlString = getRunningJobURL(jobType, null, null, 
limit);
+        switch (parameter.length) {
+            case 0 :
+                String urlString = getRunningJobURL(jobType, null, null, null);
                 return doFetchApplicationsList(urlString, compressionType);
-            }
+            case 1 :
+                limit = String.valueOf(parameter[0]);
+                break;
+            case 2 :
+                limit = String.valueOf(parameter[0]);
+                requests = (int) parameter[1];
+                break;
+            case 3 :
+                limit = String.valueOf(parameter[0]);
+                requests = (int) parameter[1];
+                timeRangePerRequestInMin = (int) parameter[2];
+                break;
+            default :
+                throw new InvalidParameterException("parameter list: limit, 
requests, requestTimeRange");
+        }
+
+        if (requests <= 1) {
+            String urlString = getRunningJobURL(jobType, null, null, limit);
+            return doFetchApplicationsList(urlString, compressionType);
+        }
 
-            long interval =  timeRangePerRequestInMin * DateTimeUtil.ONEMINUTE;
-            long currentTime = System.currentTimeMillis() - interval;
+        long interval =  timeRangePerRequestInMin * DateTimeUtil.ONEMINUTE;
+        long currentTime = System.currentTimeMillis() - interval;
 
-            List<String> requestUrls = new ArrayList<>();
-            requestUrls.add(getRunningJobURL(jobType, 
String.valueOf(currentTime), null, limit));
+        List<String> requestUrls = new ArrayList<>();
+        requestUrls.add(getRunningJobURL(jobType, String.valueOf(currentTime), 
null, limit));
 
-            for (int cnt = 2; cnt < requests; cnt++) {
-                long start = currentTime - interval;
-                requestUrls.add(getRunningJobURL(jobType, 
String.valueOf(start), String.valueOf(currentTime), limit));
-                currentTime -= interval;
-            }
+        for (int cnt = 2; cnt < requests; cnt++) {
+            long start = currentTime - interval;
+            requestUrls.add(getRunningJobURL(jobType, String.valueOf(start), 
String.valueOf(currentTime), limit));
+            currentTime -= interval;
+        }
 
-            requestUrls.add(getRunningJobURL(jobType, null, 
String.valueOf(currentTime), limit));
-            LOG.info("{} requests to fetch running MapReduce applications: 
\n{}", requestUrls.size(),
-                    StringUtils.join(requestUrls, "\n"));
+        requestUrls.add(getRunningJobURL(jobType, null, 
String.valueOf(currentTime), limit));
+        LOG.info("{} requests to fetch running MapReduce applications: \n{}", 
requestUrls.size(),
+                StringUtils.join(requestUrls, "\n"));
 
-            requestUrls.forEach(query ->
-                doFetchApplicationsList(query, compressionType).forEach(app -> 
result.put(app.getId(), app))
-            );
-        } catch (Exception e) {
-            LOG.error("Catch an exception when query url{} : {}", 
selector.getSelectedUrl(), e.getMessage(), e);
-            return apps;
+        Map<String, AppInfo> result = new HashMap();
+        for (String query : requestUrls) {
+            doFetchApplicationsList(query, compressionType).forEach(app -> 
result.put(app.getId(), app));
         }
+        List<AppInfo> apps = new ArrayList<>();
         apps.addAll(result.values());
         return apps;
     }
 
     private List<AppInfo> 
doFetchAcceptedApplicationList(Constants.CompressionType compressionType,
                                                          Object... parameter) 
throws Exception {
-        List<AppInfo> apps = new ArrayList<>();
-        try {
-            String url = getAcceptedAppURL(parameter);
-            return doFetchApplicationsList(url, compressionType);
-        } catch (Exception e) {
-            LOG.error("Catch an exception when query {} : {}", 
selector.getSelectedUrl(), e.getMessage(), e);
-        }
-        return apps;
+        String url = getAcceptedAppURL(parameter);
+        return doFetchApplicationsList(url, compressionType);
     }
 
     private List<AppInfo> getResource(Constants.ResourceType resourceType, 
Constants.CompressionType compressionType, Object... parameter) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/eagle/blob/0cda01b5/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java
index fff7a1b..1f49cc1 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.jpm.util.resourcefetch.ha;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
 import org.apache.eagle.jpm.util.resourcefetch.url.RmActiveTestURLBuilderImpl;
@@ -38,6 +39,7 @@ public class HAURLSelectorImpl implements HAURLSelector {
     private final Constants.CompressionType compressionType;
     private static final long MAX_RETRY_TIME = 2;
     private static final Logger LOG = 
LoggerFactory.getLogger(HAURLSelectorImpl.class);
+    private static ObjectMapper OBJ_MAPPER = new ObjectMapper();
 
     public HAURLSelectorImpl(String[] urls, Constants.CompressionType 
compressionType) {
         this.urls = urls;

Reply via email to