Repository: eagle
Updated Branches:
  refs/heads/master e8a58b66f -> f02b6df19


MINOR: enrich MAP_REDUCE_JOB_STREAM

Add 5 fields in MAP_REDUCE_JOB_STREAM
* numTotalMaps
* numTotalReduces
* duration
* avgMapTime
* avgReduceTime

Author: Zhao, Qingwen <qingwz...@apache.org>

Closes #940 from qingwen220/minor.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/f02b6df1
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/f02b6df1
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/f02b6df1

Branch: refs/heads/master
Commit: f02b6df19df1caa69ffc142f5e89236b5a172abe
Parents: e8a58b6
Author: Zhao, Qingwen <qingwz...@apache.org>
Authored: Sat May 27 16:00:11 2017 +0800
Committer: Zhao, Qingwen <qingwz...@apache.org>
Committed: Sat May 27 16:00:11 2017 +0800

----------------------------------------------------------------------
 .../jpm/analyzer/mr/rpc/JobRpcEvaluator.java    | 41 +++++++++++----
 .../historyentity/JobRpcAnalysisAPIEntity.java  | 55 ++++++++++++++++++++
 .../JobRpcAnalysisStreamPublisher.java          |  5 ++
 ....history.MRHistoryJobApplicationProvider.xml | 20 +++++++
 4 files changed, 111 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/f02b6df1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
index 457f0c5..86ad2c1 100644
--- 
a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
+++ 
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
@@ -54,8 +54,12 @@ public class JobRpcEvaluator implements 
Evaluator<MapReduceAnalyzerEntity>, Seri
             long reduceStartTime = Long.MAX_VALUE;
             long reduceEndTime = 0;
 
+            double totalMapTime = 0;
+            double totalReduceTime = 0;
+
             for (TaskExecutionAPIEntity task : entity.getTasksMap().values()) {
                 if 
(task.getTags().get(TASK_TYPE.toString()).equalsIgnoreCase(Constants.TaskType.MAP.toString()))
 {
+                    totalMapTime += task.getDuration();
                     if (mapStartTime > task.getStartTime()) {
                         mapStartTime = task.getStartTime();
                     }
@@ -63,6 +67,7 @@ public class JobRpcEvaluator implements 
Evaluator<MapReduceAnalyzerEntity>, Seri
                         mapEndTime = task.getEndTime();
                     }
                 } else {
+                    totalReduceTime += task.getDuration();
                     if (reduceStartTime > task.getStartTime()) {
                         reduceStartTime = task.getStartTime();
                     }
@@ -83,26 +88,42 @@ public class JobRpcEvaluator implements 
Evaluator<MapReduceAnalyzerEntity>, Seri
             analysisAPIEntity.setTags(tags);
             analysisAPIEntity.setTimestamp(entity.getStartTime());
             analysisAPIEntity.setTrackingUrl(entity.getTrackingUrl());
+            analysisAPIEntity.setDuration(entity.getDurationTime());
+            analysisAPIEntity.setNumTotalMaps(entity.getTotalMaps());
+            analysisAPIEntity.setNumTotalReduces(entity.getTotalReduces());
+            analysisAPIEntity.setCurrentState(entity.getCurrentState());
+
+            double avgOpsPerMap = 0;
+            double avgMapTime = 0;
+            double avgOpsPerReduce = 0;
+            double avgReduceTime = 0;
+            double mapOpsPerSecond = 0;
+            double reduceOpsPerSecond = 0;
+
+            if (entity.getTotalMaps() > 0) {
+                avgMapTime = totalMapTime / entity.getTotalMaps();
+                avgOpsPerMap = totalMapHdfsOps / entity.getTotalMaps();
+                mapOpsPerSecond = totalMapHdfsOps / ((mapEndTime - 
mapStartTime) / 1000);
+            }
+            if (entity.getTotalReduces() > 0) {
+                avgReduceTime = totalReduceTime / entity.getTotalReduces();
+                avgOpsPerReduce = totalReduceHdfsOps / 
entity.getTotalReduces();
+                reduceOpsPerSecond = totalReduceHdfsOps / ((reduceEndTime - 
reduceStartTime) / 1000);
+            }
 
             double totalOpsPerSecond = (entity.getDurationTime() == 0) ? 0 :
                     (totalMapHdfsOps + totalReduceHdfsOps) / 
(entity.getDurationTime() / 1000);
-            double mapOpsPerSecond = (entity.getTotalMaps() == 0) ? 0 :
-                    totalMapHdfsOps / ((mapEndTime - mapStartTime) / 1000);
-            double reduceOpsPerSecond = (entity.getTotalReduces() == 0) ? 0 :
-                    totalReduceHdfsOps / ((reduceEndTime - reduceStartTime) / 
1000);
-            
-            double avgOpsPerTask = (totalMapHdfsOps + totalReduceHdfsOps) / 
(entity.getTotalMaps() + entity.getTotalReduces());
-            double avgOpsPerMap = (entity.getTotalMaps() == 0) ? 0 :
-                    totalMapHdfsOps / entity.getTotalMaps();
-            double avgOpsPerReduce = (entity.getTotalReduces() == 0) ? 0 :
-                    totalReduceHdfsOps / entity.getTotalReduces();
 
+            double avgOpsPerTask = (totalMapHdfsOps + totalReduceHdfsOps) / 
(entity.getTotalMaps() + entity.getTotalReduces());
+            
             analysisAPIEntity.setTotalOpsPerSecond(totalOpsPerSecond);
             analysisAPIEntity.setMapOpsPerSecond(mapOpsPerSecond);
             analysisAPIEntity.setReduceOpsPerSecond(reduceOpsPerSecond);
             analysisAPIEntity.setAvgOpsPerTask(avgOpsPerTask);
             analysisAPIEntity.setAvgOpsPerMap(avgOpsPerMap);
             analysisAPIEntity.setAvgOpsPerReduce(avgOpsPerReduce);
+            analysisAPIEntity.setAvgMapTime(avgMapTime);
+            analysisAPIEntity.setAvgReduceTime(avgReduceTime);
 
             Result.EvaluatorResult result = new Result.EvaluatorResult();
             result.addProcessorEntity(JobRpcEvaluator.class, 
analysisAPIEntity);

http://git-wip-us.apache.org/repos/asf/eagle/blob/f02b6df1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
index 6c0e539..ec04286 100644
--- 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
@@ -51,6 +51,16 @@ public class JobRpcAnalysisAPIEntity extends 
TaggedLogAPIEntity {
     private double avgOpsPerMap;
     @Column("h")
     private double avgOpsPerReduce;
+    @Column("i")
+    private double avgMapTime;
+    @Column("j")
+    private double avgReduceTime;
+    @Column("k")
+    private int numTotalMaps;
+    @Column("l")
+    private int numTotalReduces;
+    @Column("m")
+    private long duration;
 
     public String getTrackingUrl() {
         return trackingUrl;
@@ -124,5 +134,50 @@ public class JobRpcAnalysisAPIEntity extends 
TaggedLogAPIEntity {
         valueChanged("avgOpsPerReduce");
     }
 
+    public double getAvgMapTime() {
+        return avgMapTime;
+    }
+
+    public void setAvgMapTime(double avgMapTime) {
+        this.avgMapTime = avgMapTime;
+        valueChanged("avgMapTime");
+    }
+
+    public double getAvgReduceTime() {
+        return avgReduceTime;
+    }
+
+    public void setAvgReduceTime(double avgReduceTime) {
+        this.avgReduceTime = avgReduceTime;
+        valueChanged("avgReduceTime");
+    }
+
+    public int getNumTotalMaps() {
+        return numTotalMaps;
+    }
+
+    public void setNumTotalMaps(int numTotalMaps) {
+        this.numTotalMaps = numTotalMaps;
+        valueChanged("numTotalMaps");
+    }
+
+    public int getNumTotalReduces() {
+        return numTotalReduces;
+    }
+
+    public void setNumTotalReduces(int numTotalReduces) {
+        this.numTotalReduces = numTotalReduces;
+        valueChanged("numTotalReduces");
+    }
+
+    public long getDuration() {
+        return duration;
+    }
+
+    public void setDuration(long duration) {
+        this.duration = duration;
+        valueChanged("duration");
+    }
+
 }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/f02b6df1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
index 3b91fbf..5549b56 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
@@ -47,6 +47,11 @@ public class JobRpcAnalysisStreamPublisher extends 
StreamPublisher<JobRpcAnalysi
         fields.put("avgOpsPerMap", entity.getAvgOpsPerMap());
         fields.put("avgOpsPerReduce", entity.getAvgOpsPerReduce());
         fields.put("currentState", entity.getCurrentState());
+        fields.put("numTotalMaps", entity.getNumTotalMaps());
+        fields.put("numTotalReduces", entity.getNumTotalReduces());
+        fields.put("duration", entity.getDuration());
+        fields.put("avgMapTime", entity.getAvgMapTime());
+        fields.put("avgReduceTime", entity.getAvgReduceTime());
 
         collector.collect(stormStreamId, new 
ValuesArray(fields.get(MRJobTagName.JOB_ID.toString()), fields));
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/f02b6df1/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
index 01c5e59..90c002a 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -226,6 +226,26 @@
                     <defaultValue>0.0</defaultValue>
                 </column>
                 <column>
+                    <name>avgMapTime</name>
+                    <type>double</type>
+                </column>
+                <column>
+                    <name>avgReduceTime</name>
+                    <type>double</type>
+                </column>
+                <column>
+                    <name>numTotalMaps</name>
+                    <type>int</type>
+                </column>
+                <column>
+                    <name>numTotalReduces</name>
+                    <type>int</type>
+                </column>
+                <column>
+                    <name>duration</name>
+                    <type>long</type>
+                </column>
+                <column>
                     <name>currentState</name>
                     <type>string</type>
                 </column>

Reply via email to