Repository: eagle Updated Branches: refs/heads/master e8a58b66f -> f02b6df19
MINOR: enrich MAP_REDUCE_JOB_STREAM Add 5 fields in MAP_REDUCE_JOB_STREAM * numTotalMaps * numTotalReduces * duration * avgMapTime * avgReduceTime Author: Zhao, Qingwen <qingwz...@apache.org> Closes #940 from qingwen220/minor. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/f02b6df1 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/f02b6df1 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/f02b6df1 Branch: refs/heads/master Commit: f02b6df19df1caa69ffc142f5e89236b5a172abe Parents: e8a58b6 Author: Zhao, Qingwen <qingwz...@apache.org> Authored: Sat May 27 16:00:11 2017 +0800 Committer: Zhao, Qingwen <qingwz...@apache.org> Committed: Sat May 27 16:00:11 2017 +0800 ---------------------------------------------------------------------- .../jpm/analyzer/mr/rpc/JobRpcEvaluator.java | 41 +++++++++++---- .../historyentity/JobRpcAnalysisAPIEntity.java | 55 ++++++++++++++++++++ .../JobRpcAnalysisStreamPublisher.java | 5 ++ ....history.MRHistoryJobApplicationProvider.xml | 20 +++++++ 4 files changed, 111 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/f02b6df1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java index 457f0c5..86ad2c1 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java @@ -54,8 +54,12 @@ public class JobRpcEvaluator implements Evaluator<MapReduceAnalyzerEntity>, Seri long reduceStartTime = Long.MAX_VALUE; long reduceEndTime = 0; + double totalMapTime = 0; + double totalReduceTime = 0; + for (TaskExecutionAPIEntity task : entity.getTasksMap().values()) { if (task.getTags().get(TASK_TYPE.toString()).equalsIgnoreCase(Constants.TaskType.MAP.toString())) { + totalMapTime += task.getDuration(); if (mapStartTime > task.getStartTime()) { mapStartTime = task.getStartTime(); } @@ -63,6 +67,7 @@ public class JobRpcEvaluator implements Evaluator<MapReduceAnalyzerEntity>, Seri mapEndTime = task.getEndTime(); } } else { + totalReduceTime += task.getDuration(); if (reduceStartTime > task.getStartTime()) { reduceStartTime = task.getStartTime(); } @@ -83,26 +88,42 @@ public class JobRpcEvaluator implements Evaluator<MapReduceAnalyzerEntity>, Seri analysisAPIEntity.setTags(tags); analysisAPIEntity.setTimestamp(entity.getStartTime()); analysisAPIEntity.setTrackingUrl(entity.getTrackingUrl()); + analysisAPIEntity.setDuration(entity.getDurationTime()); + analysisAPIEntity.setNumTotalMaps(entity.getTotalMaps()); + analysisAPIEntity.setNumTotalReduces(entity.getTotalReduces()); + analysisAPIEntity.setCurrentState(entity.getCurrentState()); + + double avgOpsPerMap = 0; + double avgMapTime = 0; + double avgOpsPerReduce = 0; + double avgReduceTime = 0; + double mapOpsPerSecond = 0; + double reduceOpsPerSecond = 0; + + if (entity.getTotalMaps() > 0) { + avgMapTime = totalMapTime / entity.getTotalMaps(); + avgOpsPerMap = totalMapHdfsOps / entity.getTotalMaps(); + mapOpsPerSecond = totalMapHdfsOps / ((mapEndTime - mapStartTime) / 1000); + } + if (entity.getTotalReduces() > 0) { + avgReduceTime = totalReduceTime / entity.getTotalReduces(); + avgOpsPerReduce = totalReduceHdfsOps / entity.getTotalReduces(); + reduceOpsPerSecond = totalReduceHdfsOps / ((reduceEndTime - reduceStartTime) / 1000); + } double totalOpsPerSecond = (entity.getDurationTime() == 0) ? 0 : (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getDurationTime() / 1000); - double mapOpsPerSecond = (entity.getTotalMaps() == 0) ? 0 : - totalMapHdfsOps / ((mapEndTime - mapStartTime) / 1000); - double reduceOpsPerSecond = (entity.getTotalReduces() == 0) ? 0 : - totalReduceHdfsOps / ((reduceEndTime - reduceStartTime) / 1000); - - double avgOpsPerTask = (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getTotalMaps() + entity.getTotalReduces()); - double avgOpsPerMap = (entity.getTotalMaps() == 0) ? 0 : - totalMapHdfsOps / entity.getTotalMaps(); - double avgOpsPerReduce = (entity.getTotalReduces() == 0) ? 0 : - totalReduceHdfsOps / entity.getTotalReduces(); + double avgOpsPerTask = (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getTotalMaps() + entity.getTotalReduces()); + analysisAPIEntity.setTotalOpsPerSecond(totalOpsPerSecond); analysisAPIEntity.setMapOpsPerSecond(mapOpsPerSecond); analysisAPIEntity.setReduceOpsPerSecond(reduceOpsPerSecond); analysisAPIEntity.setAvgOpsPerTask(avgOpsPerTask); analysisAPIEntity.setAvgOpsPerMap(avgOpsPerMap); analysisAPIEntity.setAvgOpsPerReduce(avgOpsPerReduce); + analysisAPIEntity.setAvgMapTime(avgMapTime); + analysisAPIEntity.setAvgReduceTime(avgReduceTime); Result.EvaluatorResult result = new Result.EvaluatorResult(); result.addProcessorEntity(JobRpcEvaluator.class, analysisAPIEntity); http://git-wip-us.apache.org/repos/asf/eagle/blob/f02b6df1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java index 6c0e539..ec04286 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java @@ -51,6 +51,16 @@ public class JobRpcAnalysisAPIEntity extends TaggedLogAPIEntity { private double avgOpsPerMap; @Column("h") private double avgOpsPerReduce; + @Column("i") + private double avgMapTime; + @Column("j") + private double avgReduceTime; + @Column("k") + private int numTotalMaps; + @Column("l") + private int numTotalReduces; + @Column("m") + private long duration; public String getTrackingUrl() { return trackingUrl; @@ -124,5 +134,50 @@ public class JobRpcAnalysisAPIEntity extends TaggedLogAPIEntity { valueChanged("avgOpsPerReduce"); } + public double getAvgMapTime() { + return avgMapTime; + } + + public void setAvgMapTime(double avgMapTime) { + this.avgMapTime = avgMapTime; + valueChanged("avgMapTime"); + } + + public double getAvgReduceTime() { + return avgReduceTime; + } + + public void setAvgReduceTime(double avgReduceTime) { + this.avgReduceTime = avgReduceTime; + valueChanged("avgReduceTime"); + } + + public int getNumTotalMaps() { + return numTotalMaps; + } + + public void setNumTotalMaps(int numTotalMaps) { + this.numTotalMaps = numTotalMaps; + valueChanged("numTotalMaps"); + } + + public int getNumTotalReduces() { + return numTotalReduces; + } + + public void setNumTotalReduces(int numTotalReduces) { + this.numTotalReduces = numTotalReduces; + valueChanged("numTotalReduces"); + } + + public long getDuration() { + return duration; + } + + public void setDuration(long duration) { + this.duration = duration; + valueChanged("duration"); + } + } http://git-wip-us.apache.org/repos/asf/eagle/blob/f02b6df1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java index 3b91fbf..5549b56 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java @@ -47,6 +47,11 @@ public class JobRpcAnalysisStreamPublisher extends StreamPublisher<JobRpcAnalysi fields.put("avgOpsPerMap", entity.getAvgOpsPerMap()); fields.put("avgOpsPerReduce", entity.getAvgOpsPerReduce()); fields.put("currentState", entity.getCurrentState()); + fields.put("numTotalMaps", entity.getNumTotalMaps()); + fields.put("numTotalReduces", entity.getNumTotalReduces()); + fields.put("duration", entity.getDuration()); + fields.put("avgMapTime", entity.getAvgMapTime()); + fields.put("avgReduceTime", entity.getAvgReduceTime()); collector.collect(stormStreamId, new ValuesArray(fields.get(MRJobTagName.JOB_ID.toString()), fields)); } http://git-wip-us.apache.org/repos/asf/eagle/blob/f02b6df1/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml index 01c5e59..90c002a 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml @@ -226,6 +226,26 @@ <defaultValue>0.0</defaultValue> </column> <column> + <name>avgMapTime</name> + <type>double</type> + </column> + <column> + <name>avgReduceTime</name> + <type>double</type> + </column> + <column> + <name>numTotalMaps</name> + <type>int</type> + </column> + <column> + <name>numTotalReduces</name> + <type>int</type> + </column> + <column> + <name>duration</name> + <type>long</type> + </column> + <column> <name>currentState</name> <type>string</type> </column>