Repository: eagle
Updated Branches:
  refs/heads/master af9b056dc -> 43bd197d6


[EAGLE-1024] Monitor jobs with high RPC throughput

https://issues.apache.org/jira/browse/EAGLE-1024

* add job RPC data in MAP_REDUCE_JOB_STREAM
* refactor org.apache.eagle.jpm.analyzer.publisher.EmailPublisher
* support new config 'application.analyzerReport.alertLevel' to define alert 
level

Author: Zhao, Qingwen <qingwz...@apache.org>

Closes #938 from qingwen220/EAGLE-1024.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/43bd197d
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/43bd197d
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/43bd197d

Branch: refs/heads/master
Commit: 43bd197d6a2fdbb3781cf7d91928be404ae941aa
Parents: af9b056
Author: Zhao, Qingwen <qingwz...@apache.org>
Authored: Fri May 19 18:07:53 2017 +0800
Committer: Zhao, Qingwen <qingwz...@apache.org>
Committed: Fri May 19 18:07:53 2017 +0800

----------------------------------------------------------------------
 .../org/apache/eagle/alert/utils/JsonUtils.java |   3 +-
 eagle-jpm/eagle-jpm-analyzer/pom.xml            |   6 +
 .../apache/eagle/jpm/analyzer/JobAnalyzer.java  |   3 +-
 .../meta/model/MapReduceAnalyzerEntity.java     |   9 ++
 .../analyzer/mr/MRJobPerformanceAnalyzer.java   |   9 +-
 .../jpm/analyzer/mr/rpc/JobRpcEvaluator.java    | 124 ++++++++++++++++++
 .../analyzer/publisher/EagleStorePublisher.java |   4 +-
 .../jpm/analyzer/publisher/EmailPublisher.java  |  66 ++++++----
 .../eagle/jpm/analyzer/publisher/Result.java    |  33 ++---
 .../eagle/jpm/analyzer/EmailPublisherTest.java  |  80 ++++++++++++
 .../apache/eagle/jpm/analyzer/ResultTest.java   |  38 ++++++
 .../src/test/resources/application.conf         |  44 +++++++
 .../mr/historyentity/JPAEntityRepository.java   |   1 +
 .../historyentity/JobRpcAnalysisAPIEntity.java  | 128 +++++++++++++++++++
 .../historyentity/JobSuggestionAPIEntity.java   |   2 +-
 .../jpm/mr/history/MRHistoryJobApplication.java |   9 +-
 .../history/crawler/JHFCrawlerDriverImpl.java   |   2 +-
 .../metrics/JobCounterMetricsGenerator.java     |   1 -
 .../parser/JobEntityLifecycleAggregator.java    |   4 +-
 .../history/parser/JobSuggestionListener.java   |  30 ++++-
 .../JobRpcAnalysisStreamPublisher.java          |  55 ++++++++
 .../history/publisher/JobStreamPublisher.java   |   1 -
 ....history.MRHistoryJobApplicationProvider.xml |  34 +++--
 .../jpm/spark/history/crawl/JHFSparkParser.java |   2 +-
 .../org/apache/eagle/jpm/util/Constants.java    |   3 +-
 .../org/apache/eagle/jpm/util/MRJobTagName.java |   1 -
 .../eagle/jpm/util/jobcounter/JobCounters.java  |  21 ++-
 27 files changed, 633 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
index cc75d34..c5a57f6 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
@@ -17,6 +17,7 @@
 package org.apache.eagle.alert.utils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.StringUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,7 +49,7 @@ public class JsonUtils {
                 }
             }
         } catch (Exception e) {
-            LOG.warn("exception found {}", e);
+            LOG.warn("illegal json array message: {}, ignored", 
StringUtils.abbreviate(message, 50));
         }
 
         return result;

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml 
b/eagle-jpm/eagle-jpm-analyzer/pom.xml
index a2943df..729397c 100644
--- a/eagle-jpm/eagle-jpm-analyzer/pom.xml
+++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml
@@ -60,5 +60,11 @@
             <artifactId>eagle-metadata-jdbc</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>dumbster</groupId>
+            <artifactId>dumbster</artifactId>
+            <version>1.6</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
index 1e9c00e..1028c35 100644
--- 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
+++ 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
@@ -18,6 +18,7 @@
 package org.apache.eagle.jpm.analyzer;
 
 import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
 
 /**
  * Each JobAnalyzer contains one or more Evaluators to analyze each job.
@@ -26,5 +27,5 @@ import 
org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
  *
  */
 public interface JobAnalyzer<T extends AnalyzerEntity> {
-    void analyze(T analyzerEntity) throws Exception;
+    Result analyze(T analyzerEntity) throws Exception;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
index cd6249d..604c7f4 100644
--- 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
+++ 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
@@ -41,6 +41,7 @@ public class MapReduceAnalyzerEntity extends AnalyzerEntity {
     private Map<String, TaskExecutionAPIEntity> tasksMap;
     private Map<String, TaskAttemptExecutionAPIEntity> 
completedTaskAttemptsMap;
     private Configuration jobConf;
+    private String trackingUrl;
 
     public MapReduceAnalyzerEntity() {
         this.setEndTime(-1);
@@ -171,4 +172,12 @@ public class MapReduceAnalyzerEntity extends 
AnalyzerEntity {
         this.jobConf = jobConf;
     }
 
+    public String getTrackingUrl() {
+        return trackingUrl;
+    }
+
+    public void setTrackingUrl(String trackingUrl) {
+        this.trackingUrl = trackingUrl;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
index cca6b18..0efa323 100644
--- 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
+++ 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
@@ -21,6 +21,7 @@ import com.typesafe.config.Config;
 import org.apache.eagle.jpm.analyzer.*;
 import org.apache.eagle.jpm.analyzer.Evaluator;
 import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.mr.rpc.JobRpcEvaluator;
 import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator;
 import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator;
 import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher;
@@ -42,13 +43,12 @@ public class MRJobPerformanceAnalyzer<T extends 
AnalyzerEntity> implements JobAn
     private List<Evaluator> evaluators = new ArrayList<>();
     private List<Publisher> publishers = new ArrayList<>();
 
-    private Config config;
     private AlertDeduplicator alertDeduplicator;
 
     public MRJobPerformanceAnalyzer(Config config) {
-        this.config = config;
         evaluators.add(new SLAJobEvaluator(config));
         evaluators.add(new JobSuggestionEvaluator(config));
+        evaluators.add(new JobRpcEvaluator());
 
         publishers.add(new EagleStorePublisher(config));
         publishers.add(new EmailPublisher(config));
@@ -57,7 +57,7 @@ public class MRJobPerformanceAnalyzer<T extends 
AnalyzerEntity> implements JobAn
     }
 
     @Override
-    public void analyze(T analyzerJobEntity) throws Exception {
+    public Result analyze(T analyzerJobEntity) throws Exception {
         Result result = new Result();
 
         for (Evaluator evaluator : evaluators) {
@@ -73,11 +73,12 @@ public class MRJobPerformanceAnalyzer<T extends 
AnalyzerEntity> implements JobAn
 
         if (alertDeduplicator.dedup(analyzerJobEntity, result)) {
             LOG.info("skip publish job {} alert because it is duplicated", 
analyzerJobEntity.getJobId());
-            return;
+            return null;
         }
 
         for (Publisher publisher : publishers) {
             publisher.publish(analyzerJobEntity, result);
         }
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
new file mode 100644
index 0000000..457f0c5
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
@@ -0,0 +1,124 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.rpc;
+
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.mr.historyentity.JobRpcAnalysisAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.eagle.jpm.util.MRJobTagName.*;
+
+public class JobRpcEvaluator implements Evaluator<MapReduceAnalyzerEntity>, 
Serializable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobRpcEvaluator.class);
+
+    @Override
+    public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity entity) {
+        try {
+            double totalMapHdfsOps = 0;
+            double totalReduceHdfsOps = 0;
+
+            if (entity.getFinishedMaps() != 0) {
+                totalMapHdfsOps = getTotalHdfsOps(entity.getMapCounters());
+            }
+            if (entity.getFinishedReduces() != 0) {
+                totalReduceHdfsOps = 
getTotalHdfsOps(entity.getReduceCounters());
+            }
+
+            long mapStartTime = Long.MAX_VALUE;
+            long mapEndTime = 0;
+            long reduceStartTime = Long.MAX_VALUE;
+            long reduceEndTime = 0;
+
+            for (TaskExecutionAPIEntity task : entity.getTasksMap().values()) {
+                if 
(task.getTags().get(TASK_TYPE.toString()).equalsIgnoreCase(Constants.TaskType.MAP.toString()))
 {
+                    if (mapStartTime > task.getStartTime()) {
+                        mapStartTime = task.getStartTime();
+                    }
+                    if (mapEndTime < task.getEndTime()) {
+                        mapEndTime = task.getEndTime();
+                    }
+                } else {
+                    if (reduceStartTime > task.getStartTime()) {
+                        reduceStartTime = task.getStartTime();
+                    }
+                    if (reduceEndTime < task.getEndTime()) {
+                        reduceEndTime = task.getEndTime();
+                    }
+                }
+            }
+
+            Map<String, String> tags = new HashMap<>();
+            tags.put(SITE.toString(), entity.getSiteId());
+            tags.put(USER.toString(), entity.getUserId());
+            tags.put(JOB_QUEUE.toString(), entity.getJobQueueName());
+            tags.put(JOD_DEF_ID.toString(), entity.getJobDefId());
+            tags.put(JOB_TYPE.toString(), entity.getJobType());
+            tags.put(JOB_ID.toString(), entity.getJobId());
+            JobRpcAnalysisAPIEntity analysisAPIEntity = new 
JobRpcAnalysisAPIEntity();
+            analysisAPIEntity.setTags(tags);
+            analysisAPIEntity.setTimestamp(entity.getStartTime());
+            analysisAPIEntity.setTrackingUrl(entity.getTrackingUrl());
+
+            double totalOpsPerSecond = (entity.getDurationTime() == 0) ? 0 :
+                    (totalMapHdfsOps + totalReduceHdfsOps) / 
(entity.getDurationTime() / 1000);
+            double mapOpsPerSecond = (entity.getTotalMaps() == 0) ? 0 :
+                    totalMapHdfsOps / ((mapEndTime - mapStartTime) / 1000);
+            double reduceOpsPerSecond = (entity.getTotalReduces() == 0) ? 0 :
+                    totalReduceHdfsOps / ((reduceEndTime - reduceStartTime) / 
1000);
+            
+            double avgOpsPerTask = (totalMapHdfsOps + totalReduceHdfsOps) / 
(entity.getTotalMaps() + entity.getTotalReduces());
+            double avgOpsPerMap = (entity.getTotalMaps() == 0) ? 0 :
+                    totalMapHdfsOps / entity.getTotalMaps();
+            double avgOpsPerReduce = (entity.getTotalReduces() == 0) ? 0 :
+                    totalReduceHdfsOps / entity.getTotalReduces();
+
+            analysisAPIEntity.setTotalOpsPerSecond(totalOpsPerSecond);
+            analysisAPIEntity.setMapOpsPerSecond(mapOpsPerSecond);
+            analysisAPIEntity.setReduceOpsPerSecond(reduceOpsPerSecond);
+            analysisAPIEntity.setAvgOpsPerTask(avgOpsPerTask);
+            analysisAPIEntity.setAvgOpsPerMap(avgOpsPerMap);
+            analysisAPIEntity.setAvgOpsPerReduce(avgOpsPerReduce);
+
+            Result.EvaluatorResult result = new Result.EvaluatorResult();
+            result.addProcessorEntity(JobRpcEvaluator.class, 
analysisAPIEntity);
+
+            return result;
+        } catch (Exception e) {
+            LOG.error("Rpc analysis failed for job {} due to {}", 
entity.getJobId(), e.getMessage());
+            return null;
+        }
+
+    }
+
+    private long getTotalHdfsOps(JobCounters counter) {
+        long mapHdfsReadOps = 
counter.getCounterValue(JobCounters.CounterName.HDFS_READ_OPS);
+        long mapHdfsWriteOps = 
counter.getCounterValue(JobCounters.CounterName.HDFS_WRITE_OPS);
+        return  mapHdfsReadOps + mapHdfsWriteOps;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
index 1c5a033..349a611 100644
--- 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
+++ 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
@@ -44,11 +44,11 @@ public class EagleStorePublisher implements Publisher, 
Serializable {
 
     @Override
     public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
-        if (result.getAlertMessages().size() == 0) {
+        if (result.getAlertEntities().size() == 0) {
             return;
         }
 
-        LOG.info("EagleStorePublisher gets job {}", 
analyzerJobEntity.getJobDefId());
+        LOG.info("EagleStorePublisher gets job {}", 
analyzerJobEntity.getJobId());
 
         try {
             this.client = new EagleServiceClientImpl(config);

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
index 4ec946d..f495cf8 100644
--- 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
+++ 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
@@ -28,8 +28,10 @@ import org.apache.eagle.common.mail.AlertEmailContext;
 import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity;
 import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator;
+import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator;
 import org.apache.eagle.jpm.analyzer.util.Constants;
 import org.apache.eagle.jpm.analyzer.util.Utils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +40,7 @@ import java.util.*;
 
 public class EmailPublisher implements Publisher, Serializable {
     private static final Logger LOG = 
LoggerFactory.getLogger(EmailPublisher.class);
+    private static final String ALERT_LEVEL_PATH = 
"application.analyzerReport.alertLevel";
 
     private Config config;
     private List<Class<?>> publishResult4Evaluators = new ArrayList<>();
@@ -47,6 +50,8 @@ public class EmailPublisher implements Publisher, 
Serializable {
         this.config = config;
         //TODO
         publishResult4Evaluators.add(SLAJobEvaluator.class);
+        publishResult4Evaluators.add(JobSuggestionEvaluator.class);
+
         timeZone = 
TimeZone.getTimeZone((config.hasPath(EagleConfigConstants.EAGLE_TIME_ZONE)
                 ? config.getString(EagleConfigConstants.EAGLE_TIME_ZONE)
                 : EagleConfigConstants.DEFAULT_EAGLE_TIME_ZONE));
@@ -60,11 +65,34 @@ public class EmailPublisher implements Publisher, 
Serializable {
             return;
         }
 
-        Map<String, List<Result.ProcessorResult>> extend = new HashMap<>();
+        Map<String, List<Result.ProcessorResult>> targetResult = new 
HashMap<>();
         publishResult4Evaluators
                 .stream()
                 .filter(item -> 
result.getAlertMessages().containsKey(item.getName()))
-                .forEach(item -> extend.put(item.getName(), 
result.getAlertMessages().get(item.getName())));
+                .forEach(item -> targetResult.put(item.getName(), 
result.getAlertMessages().get(item.getName())));
+
+        Map<String, List<Result.ProcessorResult>> extend = new HashMap<>();
+
+        Result.ResultLevel serverAlertLevel = getServerAlertLevel();
+        Result.ResultLevel jobAlertLevel = serverAlertLevel;
+        for (String evaluator : targetResult.keySet()) {
+            List<Result.ProcessorResult> alertMessages = new ArrayList<>();
+            for (Result.ProcessorResult message : targetResult.get(evaluator)) 
{
+                if (isHigherAlertLevel(message.getResultLevel(), 
serverAlertLevel)) {
+                    alertMessages.add(message);
+
+                    if (isHigherAlertLevel(message.getResultLevel(), 
jobAlertLevel)) {
+                        jobAlertLevel = message.getResultLevel();
+                    }
+
+                    LOG.info("Job [{}] Got Message [{}], Level [{}] By 
Evaluator [{}]",
+                            analyzerJobEntity.getJobId(), 
message.getMessage(), message.getResultLevel(), evaluator);
+                }
+            }
+            if (!alertMessages.isEmpty()) {
+                extend.put(evaluator, alertMessages);
+            }
+        }
 
         if (extend.size() == 0) {
             return;
@@ -82,19 +110,12 @@ public class EmailPublisher implements Publisher, 
Serializable {
         basic.put("end", analyzerJobEntity.getEndTime() == 0
                 ? "0"
                 : 
DateTimeUtil.secondsToHumanDate(analyzerJobEntity.getEndTime() / 1000, 
timeZone));
-        double progress = 
analyzerJobEntity.getCurrentState().equalsIgnoreCase(org.apache.eagle.jpm.util.Constants.JobState.RUNNING.toString())
 ? analyzerJobEntity.getProgress() : 100;
+        double progress = 
org.apache.eagle.jpm.util.Constants.JobState.RUNNING.toString().equalsIgnoreCase(analyzerJobEntity.getCurrentState())
 ? analyzerJobEntity.getProgress() : 100;
         basic.put("progress", progress + "%");
         basic.put("detail", getJobLink(analyzerJobEntity));
 
         Map<String, Object> alertData = new HashMap<>();
-        for (String evaluator : extend.keySet()) {
-            for (Result.ProcessorResult message : extend.get(evaluator)) {
-                setAlertLevel(alertData, message.getResultLevel());
-                LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator 
[{}]",
-                        analyzerJobEntity.getJobDefId(), message.getMessage(), 
message.getResultLevel(), evaluator);
-            }
-        }
-
+        alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, 
jobAlertLevel.toString());
         alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic);
         alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend);
         Config cloneConfig = ConfigFactory.empty().withFallback(config);
@@ -124,18 +145,19 @@ public class EmailPublisher implements Publisher, 
Serializable {
                 + analyzerJobEntity.getJobId();
     }
 
-    private void setAlertLevel(Map<String, Object> alertData, 
Result.ResultLevel level) {
-        if 
(!alertData.containsKey(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY)) {
-            alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, 
Result.ResultLevel.INFO.toString());
-        }
-
-        if (level.equals(Result.ResultLevel.CRITICAL)) {
-            alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, 
level.toString());
+    private Result.ResultLevel getServerAlertLevel() {
+        Result.ResultLevel alertLevel = Result.ResultLevel.WARNING;
+        if (config.hasPath(ALERT_LEVEL_PATH)) {
+            try {
+                alertLevel = 
Result.ResultLevel.fromString(config.getString(ALERT_LEVEL_PATH));
+            } catch (Exception e) {
+                LOG.warn("invalid alert level config: {}, using WARNING level 
instead", config.getString(ALERT_LEVEL_PATH));
+            }
         }
+        return alertLevel;
+    }
 
-        if (level.equals(Result.ResultLevel.WARNING)
-                && 
!alertData.get(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY).equals(Result.ResultLevel.CRITICAL.toString()))
 {
-            alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, 
level.toString());
-        }
+    private boolean isHigherAlertLevel(Result.ResultLevel level, 
Result.ResultLevel serverLevel) {
+        return level.ordinal() >= serverLevel.ordinal();
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
index 27da4b9..700fae4 100644
--- 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
+++ 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
@@ -35,23 +35,22 @@ public class Result {
         Map<Class<?>, ProcessorResult> processorResults = 
result.getProcessorResults();
         Map<Class<?>, TaggedLogAPIEntity> processorEntities = 
result.getProcessorEntities();
 
+        String typeName = type.getName();
+        if (!processorEntities.values().isEmpty()) {
+            alertEntities.put(typeName, new ArrayList<>());
+            alertEntities.get(typeName).addAll(processorEntities.values());
+        }
+
         for (Class<?> processorType : processorResults.keySet()) {
             ProcessorResult processorResult = 
processorResults.get(processorType);
 
-            if (processorResult.resultLevel.equals(ResultLevel.NONE)) {
-                continue;
-            }
-
-            String typeName = type.getName();
+            normalizeResult(processorResult);
             if (!alertMessages.containsKey(typeName)) {
                 alertMessages.put(typeName, new ArrayList<>());
-                alertEntities.put(typeName, new ArrayList<>());
             }
-            normalizeResult(processorResult);
             alertMessages.get(typeName).add(processorResult);
-            
alertEntities.get(typeName).add(processorEntities.get(processorType));
-
         }
+
     }
 
     public Map<String, List<ProcessorResult>> getAlertMessages() {
@@ -80,15 +79,17 @@ public class Result {
         WARNING,
         CRITICAL;
 
-        private static final Map<String, ResultLevel> stringToLevels = new 
HashMap<>();
-        static {
-            for (ResultLevel level : values()) {
-                stringToLevels.put(level.toString(), level);
-            }
+        public static ResultLevel fromString(String levelString) {
+            return ResultLevel.valueOf(levelString);
         }
 
-        public static ResultLevel fromString(String levelString) {
-            return stringToLevels.get(levelString);
+        public static boolean contains(String levelString) {
+            for (ResultLevel level : ResultLevel.values()) {
+                if (level.name().equals(levelString)) {
+                    return true;
+                }
+            }
+            return false;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/EmailPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/EmailPublisherTest.java
 
b/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/EmailPublisherTest.java
new file mode 100644
index 0000000..e7188d7
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/EmailPublisherTest.java
@@ -0,0 +1,80 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import com.dumbster.smtp.SimpleSmtpServer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator;
+import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator;
+import 
org.apache.eagle.jpm.analyzer.mr.suggestion.MapReduceCompressionSettingProcessor;
+import org.apache.eagle.jpm.analyzer.mr.suggestion.MapReduceDataSkewProcessor;
+import org.apache.eagle.jpm.analyzer.publisher.EmailPublisher;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EmailPublisherTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(EmailPublisherTest.class);
+    private static final int SMTP_PORT = 5025;
+    private SimpleSmtpServer server;
+
+    @Before
+    public void setUp() {
+        server = SimpleSmtpServer.start(SMTP_PORT);
+    }
+
+    @After
+    public void clear() {
+        if (server != null) {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void test() {
+        AnalyzerEntity analyzerJobEntity = new AnalyzerEntity();
+        analyzerJobEntity.setJobId("job1");
+
+        Result.EvaluatorResult jobSuggestionResult = new 
Result.EvaluatorResult();
+        
jobSuggestionResult.addProcessorResult(MapReduceCompressionSettingProcessor.class,
 new Result.ProcessorResult(Result.RuleType.COMPRESS, Result.ResultLevel.INFO, 
"compress"));
+        
jobSuggestionResult.addProcessorResult(MapReduceDataSkewProcessor.class, new 
Result.ProcessorResult(Result.RuleType.DATA_SKEW, Result.ResultLevel.WARNING, 
"data skew"));
+
+        Result.EvaluatorResult jobSlaResult = new Result.EvaluatorResult();
+        jobSlaResult.addProcessorResult(Processor.class, new 
Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, 
Result.ResultLevel.CRITICAL, "long running job"));
+
+        Result result = new Result();
+        result.addEvaluatorResult(JobSuggestionEvaluator.class, 
jobSuggestionResult);
+        result.addEvaluatorResult(SLAJobEvaluator.class, jobSlaResult);
+
+        Config config = ConfigFactory.load();
+        EmailPublisher publisher = new EmailPublisher(config);
+        publisher.publish(analyzerJobEntity, result);
+
+        Assert.assertTrue(server.getReceivedEmailSize() == 1 );
+        Assert.assertTrue(server.getReceivedEmail().hasNext());
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/ResultTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/ResultTest.java
 
b/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/ResultTest.java
new file mode 100644
index 0000000..2185349
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/ResultTest.java
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ResultTest {
+
+    @Test
+    public void testResultLevel() {
+        String info = "info";
+        Assert.assertFalse(Result.ResultLevel.contains(info));
+
+        String INFO = "INFO";
+        Assert.assertTrue(Result.ResultLevel.contains(INFO));
+        
Assert.assertTrue(Result.ResultLevel.fromString(INFO).equals(Result.ResultLevel.INFO));
+
+        Assert.assertTrue(Result.ResultLevel.INFO.ordinal() == 1);
+        Assert.assertTrue(Result.ResultLevel.CRITICAL.ordinal() > 
Result.ResultLevel.INFO.ordinal());
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/test/resources/application.conf 
b/eagle-jpm/eagle-jpm-analyzer/src/test/resources/application.conf
new file mode 100644
index 0000000..a66e396
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/test/resources/application.conf
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+service {
+  env = "testing"
+  host = "localhost"
+  port = 9090
+  username = "admin"
+  password = "secret"
+  readTimeOutSeconds = 60
+  context = "/rest"
+  timezone = "UTC"
+}
+
+application {
+
+  mailService {
+    mailSmtpServer = "localhost",
+    mailSmtpPort = 5025,
+    mailSmtpAuth = "false"
+    //mailSmtpConn = "plaintext",
+    //mailSmtpUsername = ""
+    //mailSmtpPassword = ""
+    //mailSmtpDebug = false
+  }
+  analyzerReport {
+    sender: "a...@example.com"
+    recipients: "a...@example.com"
+    template: "AnalyzerReportTemplate.vm"
+    alertLevel: "CRITICAL"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
index 54383bd..936aa4e 100644
--- 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
@@ -40,5 +40,6 @@ public class JPAEntityRepository extends EntityRepository {
         entitySet.add(JobCountEntity.class);
         entitySet.add(TaskAttemptErrorCategoryEntity.class);
         entitySet.add(JobSuggestionAPIEntity.class);
+        entitySet.add(JobRpcAnalysisAPIEntity.class);
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
new file mode 100644
index 0000000..6c0e539
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
@@ -0,0 +1,128 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.historyentity;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+
+import static 
org.apache.eagle.jpm.util.Constants.MR_JOB_RPC_ANALYSIS_SERVICE_NAME;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa_analysis")
+@ColumnFamily("f")
+@Prefix("counter")
+@Service(MR_JOB_RPC_ANALYSIS_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+        @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true),
+        @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = 
false)
+        })
+public class JobRpcAnalysisAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String trackingUrl;
+    @Column("b")
+    private String currentState;
+    @Column("c")
+    private double totalOpsPerSecond;
+    @Column("d")
+    private double mapOpsPerSecond;
+    @Column("e")
+    private double reduceOpsPerSecond;
+    @Column("f")
+    private double avgOpsPerTask;
+    @Column("g")
+    private double avgOpsPerMap;
+    @Column("h")
+    private double avgOpsPerReduce;
+
+    public String getTrackingUrl() {
+        return trackingUrl;
+    }
+
+    public void setTrackingUrl(String trackingUrl) {
+        this.trackingUrl = trackingUrl;
+        valueChanged("trackingUrl");
+    }
+
+    public String getCurrentState() {
+        return currentState;
+    }
+
+    public void setCurrentState(String currentState) {
+        this.currentState = currentState;
+        valueChanged("currentState");
+    }
+
+    public double getTotalOpsPerSecond() {
+        return totalOpsPerSecond;
+    }
+
+    public void setTotalOpsPerSecond(double totalOpsPerSecond) {
+        this.totalOpsPerSecond = totalOpsPerSecond;
+        valueChanged("totalOpsPerSecond");
+    }
+
+    public double getMapOpsPerSecond() {
+        return mapOpsPerSecond;
+    }
+
+    public void setMapOpsPerSecond(double mapOpsPerSecond) {
+        this.mapOpsPerSecond = mapOpsPerSecond;
+        valueChanged("mapOpsPerSecond");
+    }
+
+    public double getReduceOpsPerSecond() {
+        return reduceOpsPerSecond;
+    }
+
+    public void setReduceOpsPerSecond(double reduceOpsPerSecond) {
+        this.reduceOpsPerSecond = reduceOpsPerSecond;
+        valueChanged("reduceOpsPerSecond");
+    }
+
+    public double getAvgOpsPerTask() {
+        return avgOpsPerTask;
+    }
+
+    public void setAvgOpsPerTask(double avgOpsPerTask) {
+        this.avgOpsPerTask = avgOpsPerTask;
+        valueChanged("avgOpsPerTask");
+    }
+
+    public double getAvgOpsPerMap() {
+        return avgOpsPerMap;
+    }
+
+    public void setAvgOpsPerMap(double avgOpsPerMap) {
+        this.avgOpsPerMap = avgOpsPerMap;
+        valueChanged("avgOpsPerMap");
+    }
+
+    public double getAvgOpsPerReduce() {
+        return avgOpsPerReduce;
+    }
+
+    public void setAvgOpsPerReduce(double avgOpsPerReduce) {
+        this.avgOpsPerReduce = avgOpsPerReduce;
+        valueChanged("avgOpsPerReduce");
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
index e433672..cc38cc4 100644
--- 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
@@ -26,7 +26,7 @@ import 
com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import static 
org.apache.eagle.jpm.util.Constants.MR_JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME;
 
 @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@Table("eaglejpa")
+@Table("eaglejpa_analysis")
 @ColumnFamily("f")
 @Prefix("jsuggestion")
 @Service(MR_JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME)

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
index 4d02e91..beb1887 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -22,11 +22,7 @@ import 
org.apache.eagle.app.environment.impl.StormEnvironment;
 import org.apache.eagle.app.messaging.StormStreamSink;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
-import 
org.apache.eagle.jpm.mr.history.crawler.JobHistorySpoutCollectorInterceptor;
-import org.apache.eagle.jpm.mr.history.publisher.JobStreamPublisher;
-import org.apache.eagle.jpm.mr.history.publisher.StreamPublisher;
-import org.apache.eagle.jpm.mr.history.publisher.StreamPublisherManager;
-import org.apache.eagle.jpm.mr.history.publisher.TaskAttemptStreamPublisher;
+import org.apache.eagle.jpm.mr.history.publisher.*;
 import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
 import org.apache.eagle.jpm.util.Constants;
 
@@ -90,8 +86,9 @@ public class MRHistoryJobApplication extends StormApplication 
{
         taskAttemptKafkaBoltDeclarer.shuffleGrouping(spoutName, 
spoutToTaskAttemptSinkName);
 
         List<StreamPublisher> streamPublishers = new ArrayList<>();
-        streamPublishers.add(new JobStreamPublisher(spoutToJobSinkName));
+        //streamPublishers.add(new JobStreamPublisher(spoutToJobSinkName));
         streamPublishers.add(new 
TaskAttemptStreamPublisher(spoutToTaskAttemptSinkName));
+        streamPublishers.add(new 
JobRpcAnalysisStreamPublisher(spoutToJobSinkName));
         jobHistorySpout.setStreamPublishers(streamPublishers);
 
         return topologyBuilder.createTopology();

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index 2e56632..845aa77 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -119,7 +119,7 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
                     }
                 }
                 jobHistoryFiles.clear();
-                LOG.info("after filtering, number of job history files " + 
processQueue.size());
+                LOG.info("after filtering, number of job history files " + 
allJobHistoryFiles.size());
             }
 
             Collections.sort(allJobHistoryFiles,

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
index 6020a3b..0f3dd23 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.mr.history.metrics;
 
-import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.MRJobTagName;

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
index ea0ba30..d9f52a3 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.eagle.jpm.util.MRJobTagName.TASK_TYPE;
+
 public class JobEntityLifecycleAggregator implements 
HistoryJobEntityLifecycleListener {
     private static final Logger LOG = 
LoggerFactory.getLogger(JobEntityLifecycleAggregator.class);
     private JobExecutionAPIEntity jobExecutionAPIEntity;
@@ -107,7 +109,7 @@ public class JobEntityLifecycleAggregator implements 
HistoryJobEntityLifecycleLi
 
     private void taskAttemptEntityCreated(TaskAttemptExecutionAPIEntity 
entity) {
         JobCounters jobCounters = entity.getJobCounters();
-        String taskType = entity.getTags().get(Constants.JOB_TASK_TYPE_TAG);
+        String taskType = entity.getTags().get(TASK_TYPE.toString());
 
         if (taskType != null && jobCounters != null && 
jobCounters.getCounters() != null) {
             if 
(Constants.TaskType.MAP.toString().equals(taskType.toUpperCase())) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
index b174a2f..67c1ec9 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
@@ -20,33 +20,38 @@ package org.apache.eagle.jpm.mr.history.parser;
 import com.typesafe.config.Config;
 import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
-import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
-import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
-import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
-import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.analyzer.mr.rpc.JobRpcEvaluator;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.mr.history.publisher.StreamPublisher;
+import org.apache.eagle.jpm.mr.history.publisher.StreamPublisherManager;
+import org.apache.eagle.jpm.mr.historyentity.*;
 import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.jpm.util.jobcounter.JobCounters;
-import org.apache.eagle.jpm.mr.history.parser.JHFEventReaderBase.Keys;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.hadoop.conf.Configuration;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ATTEMPT_ID;
 import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ID;
 
 /*
- * JobEventCounterListener provides an interface to add job/task counter 
analyzers
+ * JobSuggestionListener provides an interface to analyze job performance and 
alerts by email
  */
 public class JobSuggestionListener implements HistoryJobEntityCreationListener 
{
     private static final Logger LOG = 
LoggerFactory.getLogger(JobSuggestionListener.class);
 
     private MapReduceAnalyzerEntity info;
     private MRJobPerformanceAnalyzer<MapReduceAnalyzerEntity> analyzer;
+    private StreamPublisher streamPublisher;
 
     public JobSuggestionListener(Config config) {
         this.info = new MapReduceAnalyzerEntity();
         this.analyzer = new MRJobPerformanceAnalyzer<>(config);
+        this.streamPublisher = 
StreamPublisherManager.getInstance().getStreamPublisher(JobRpcAnalysisAPIEntity.class);
     }
 
     @Override
@@ -75,6 +80,7 @@ public class JobSuggestionListener implements 
HistoryJobEntityCreationListener {
             info.setTotalMaps(jobExecutionAPIEntity.getNumTotalMaps());
             info.setTotalReduces(jobExecutionAPIEntity.getNumTotalReduces());
             info.setProgress(100);
+            info.setTrackingUrl(((JobExecutionAPIEntity) 
entity).getTrackingUrl());
         }
     }
 
@@ -90,6 +96,16 @@ public class JobSuggestionListener implements 
HistoryJobEntityCreationListener {
 
     @Override
     public void flush() throws Exception {
-        analyzer.analyze(info);
+        Result result = analyzer.analyze(info);
+        if (streamPublisher != null && result != null) {
+            List<TaggedLogAPIEntity> entities = 
result.getAlertEntities().get(JobRpcEvaluator.class.getName());
+            if (entities != null && !entities.isEmpty()) {
+                for (TaggedLogAPIEntity entity : entities) {
+                    streamPublisher.flush(entity);
+                }
+            }
+        } else {
+            LOG.warn("JobRpcAnalysisStreamPublisher is null");
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
new file mode 100644
index 0000000..3b91fbf
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
@@ -0,0 +1,55 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.history.publisher;
+
+import org.apache.eagle.dataproc.impl.storm.ValuesArray;
+import org.apache.eagle.jpm.mr.historyentity.JobRpcAnalysisAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class JobRpcAnalysisStreamPublisher extends 
StreamPublisher<JobRpcAnalysisAPIEntity> {
+
+    public JobRpcAnalysisStreamPublisher(String stormStreamId) {
+        super(stormStreamId);
+    }
+
+    @Override
+    public Class<?> type() {
+        return JobRpcAnalysisAPIEntity.class;
+    }
+
+    @Override
+    public void flush(JobRpcAnalysisAPIEntity entity) {
+        Map<String, Object> fields = new HashMap<>(entity.getTags());
+        fields.put("trackingUrl", entity.getTrackingUrl());
+        fields.put("totalOpsPerSecond", entity.getTotalOpsPerSecond());
+        fields.put("mapOpsPerSecond", entity.getMapOpsPerSecond());
+        fields.put("reduceOpsPerSecond", entity.getReduceOpsPerSecond());
+        fields.put("avgOpsPerTask", entity.getAvgOpsPerTask());
+        fields.put("avgOpsPerMap", entity.getAvgOpsPerMap());
+        fields.put("avgOpsPerReduce", entity.getAvgOpsPerReduce());
+        fields.put("currentState", entity.getCurrentState());
+
+        collector.collect(stormStreamId, new 
ValuesArray(fields.get(MRJobTagName.JOB_ID.toString()), fields));
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
index 92c415c..a325cbe 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
@@ -19,7 +19,6 @@
 package org.apache.eagle.jpm.mr.history.publisher;
 
 import org.apache.eagle.dataproc.impl.storm.ValuesArray;
-import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector;
 import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.MRJobTagName;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
index 5c479a3..01c5e59 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -192,20 +192,38 @@
                     <type>string</type>
                 </column>
                 <column>
-                    <name>submissionTime</name>
-                    <type>long</type>
-                </column>
-                <column>
                     <name>trackingUrl</name>
                     <type>string</type>
                 </column>
                 <column>
-                    <name>startTime</name>
-                    <type>long</type>
+                    <name>totalOpsPerSecond</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
                 </column>
                 <column>
-                    <name>endTime</name>
-                    <type>long</type>
+                    <name>mapOpsPerSecond</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
+                </column>
+                <column>
+                    <name>reduceOpsPerSecond</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
+                </column>
+                <column>
+                    <name>avgOpsPerTask</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
+                </column>
+                <column>
+                    <name>avgOpsPerMapTask</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
+                </column>
+                <column>
+                    <name>avgOpsPerReduceTask</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
                 </column>
                 <column>
                     <name>currentState</name>

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
index 8c12633..420b24c 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
@@ -46,7 +46,7 @@ public class JHFSparkParser implements JHFParserBase {
             for (String line = reader.readLine(); line != null; line = 
reader.readLine()) {
                 isValidJson = true;
                 JSONObject eventObj = parseAndValidateJSON(line);
-                if (isValidJson) {
+                if (isValidJson && eventObj != null) {
                     this.eventReader.read(eventObj);
                 }
             }

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index fee0425..f7bb74e 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -128,10 +128,9 @@ public class Constants {
     public static final String MR_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME 
= "RunningTaskAttemptExecutionService";
     public static final String MR_JOB_PROCESS_TIME_STAMP_NAME = 
"JobProcessTimeStampService";
     public static final String MR_JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME = 
"JobOptimizerSuggestionService";
+    public static final String MR_JOB_RPC_ANALYSIS_SERVICE_NAME = 
"JobRpcAnalysisService";
     public static final String ACCEPTED_APP_SERVICE_NAME = 
"AcceptedAppService";
 
-    public static final String JOB_TASK_TYPE_TAG = "taskType";
-
     public static class JobConfiguration {
         // job type
         public static final String SCOOBI_JOB = "scoobi.mode";

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
index 9811772..9b0f79f 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
@@ -43,7 +43,6 @@ public enum MRJobTagName {
     }
 
     public String toString() {
-
         return this.tagName;
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
index e337c13..66cedc3 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
@@ -56,6 +56,14 @@ public final class JobCounters implements Serializable {
         }
     }
 
+    public Long getCounterValue(String groupName, String counterName) {
+        if (counters.containsKey(groupName) && 
counters.get(groupName).containsKey(counterName)) {
+            return counters.get(groupName).get(counterName);
+        } else {
+            return 0L;
+        }
+    }
+
     public static enum GroupName {
         FileSystemCounters("org.apache.hadoop.mapreduce.FileSystemCounter", 
"FileSystemCounters"),
         MapReduceTaskCounter("org.apache.hadoop.mapreduce.TaskCounter", 
"MapReduceTaskCounter"),
@@ -80,10 +88,15 @@ public final class JobCounters implements Serializable {
 
     public static enum CounterName {
 
-        FILE_BYTES_READ(GroupName.FileSystemCounters, "FILE_BYTES_READ", 
"FILE_BYTES_READ"),
-        FILE_BYTES_WRITTEN(GroupName.FileSystemCounters, "FILE_BYTES_WRITTEN", 
"FILE_BYTES_WRITTEN"),
-        HDFS_BYTES_READ(GroupName.FileSystemCounters, "HDFS_BYTES_READ", 
"HDFS_BYTES_READ"),
-        HDFS_BYTES_WRITTEN(GroupName.FileSystemCounters, "HDFS_BYTES_WRITTEN", 
"HDFS_BYTES_WRITTEN"),
+        FILE_BYTES_READ(GroupName.FileSystemCounters, "FILE_BYTES_READ", "File 
read bytes"),
+        FILE_BYTES_WRITTEN(GroupName.FileSystemCounters, "FILE_BYTES_WRITTEN", 
"File written bytes"),
+        HDFS_BYTES_READ(GroupName.FileSystemCounters, "HDFS_BYTES_READ", "HDFS 
read bytes"),
+        HDFS_BYTES_WRITTEN(GroupName.FileSystemCounters, "HDFS_BYTES_WRITTEN", 
"HDFS written bytes"),
+        HDFS_READ_OPS(GroupName.FileSystemCounters, "HDFS_READ_OPS", "HDFS 
read ops"),
+        HDFS_WRITE_OPS(GroupName.FileSystemCounters, "HDFS_WRITE_OPS", "HDFS 
write ops"),
+
+        MILLIS_MAPS(GroupName.MapReduceJobCounter, "MILLIS_MAPS", "total maps 
mills"),
+        MILLIS_REDUCES(GroupName.MapReduceJobCounter, "MILLIS_REDUCES", "total 
reduce mills"),
 
         MAP_INPUT_RECORDS(GroupName.MapReduceTaskCounter, "MAP_INPUT_RECORDS", 
"Map input records"),
         MAP_OUTPUT_RECORDS(GroupName.MapReduceTaskCounter, 
"MAP_OUTPUT_RECORDS", "Map output records"),

Reply via email to