Repository: eagle Updated Branches: refs/heads/master af9b056dc -> 43bd197d6
[EAGLE-1024] Monitor jobs with high RPC throughput https://issues.apache.org/jira/browse/EAGLE-1024 * add job RPC data in MAP_REDUCE_JOB_STREAM * refactor org.apache.eagle.jpm.analyzer.publisher.EmailPublisher * support new config 'application.analyzerReport.alertLevel' to define alert level Author: Zhao, Qingwen <qingwz...@apache.org> Closes #938 from qingwen220/EAGLE-1024. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/43bd197d Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/43bd197d Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/43bd197d Branch: refs/heads/master Commit: 43bd197d6a2fdbb3781cf7d91928be404ae941aa Parents: af9b056 Author: Zhao, Qingwen <qingwz...@apache.org> Authored: Fri May 19 18:07:53 2017 +0800 Committer: Zhao, Qingwen <qingwz...@apache.org> Committed: Fri May 19 18:07:53 2017 +0800 ---------------------------------------------------------------------- .../org/apache/eagle/alert/utils/JsonUtils.java | 3 +- eagle-jpm/eagle-jpm-analyzer/pom.xml | 6 + .../apache/eagle/jpm/analyzer/JobAnalyzer.java | 3 +- .../meta/model/MapReduceAnalyzerEntity.java | 9 ++ .../analyzer/mr/MRJobPerformanceAnalyzer.java | 9 +- .../jpm/analyzer/mr/rpc/JobRpcEvaluator.java | 124 ++++++++++++++++++ .../analyzer/publisher/EagleStorePublisher.java | 4 +- .../jpm/analyzer/publisher/EmailPublisher.java | 66 ++++++---- .../eagle/jpm/analyzer/publisher/Result.java | 33 ++--- .../eagle/jpm/analyzer/EmailPublisherTest.java | 80 ++++++++++++ .../apache/eagle/jpm/analyzer/ResultTest.java | 38 ++++++ .../src/test/resources/application.conf | 44 +++++++ .../mr/historyentity/JPAEntityRepository.java | 1 + .../historyentity/JobRpcAnalysisAPIEntity.java | 128 +++++++++++++++++++ .../historyentity/JobSuggestionAPIEntity.java | 2 +- .../jpm/mr/history/MRHistoryJobApplication.java | 9 +- .../history/crawler/JHFCrawlerDriverImpl.java | 2 +- .../metrics/JobCounterMetricsGenerator.java | 1 - .../parser/JobEntityLifecycleAggregator.java | 4 +- .../history/parser/JobSuggestionListener.java | 30 ++++- .../JobRpcAnalysisStreamPublisher.java | 55 ++++++++ .../history/publisher/JobStreamPublisher.java | 1 - ....history.MRHistoryJobApplicationProvider.xml | 34 +++-- .../jpm/spark/history/crawl/JHFSparkParser.java | 2 +- .../org/apache/eagle/jpm/util/Constants.java | 3 +- .../org/apache/eagle/jpm/util/MRJobTagName.java | 1 - .../eagle/jpm/util/jobcounter/JobCounters.java | 21 ++- 27 files changed, 633 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java index cc75d34..c5a57f6 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java @@ -17,6 +17,7 @@ package org.apache.eagle.alert.utils; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang.StringUtils; import org.codehaus.jettison.json.JSONArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,7 @@ public class JsonUtils { } } } catch (Exception e) { - LOG.warn("exception found {}", e); + LOG.warn("illegal json array message: {}, ignored", StringUtils.abbreviate(message, 50)); } return result; http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml index a2943df..729397c 100644 --- a/eagle-jpm/eagle-jpm-analyzer/pom.xml +++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml @@ -60,5 +60,11 @@ <artifactId>eagle-metadata-jdbc</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>dumbster</groupId> + <artifactId>dumbster</artifactId> + <version>1.6</version> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java index 1e9c00e..1028c35 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java @@ -18,6 +18,7 @@ package org.apache.eagle.jpm.analyzer; import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; /** * Each JobAnalyzer contains one or more Evaluators to analyze each job. @@ -26,5 +27,5 @@ import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; * */ public interface JobAnalyzer<T extends AnalyzerEntity> { - void analyze(T analyzerEntity) throws Exception; + Result analyze(T analyzerEntity) throws Exception; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java index cd6249d..604c7f4 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java @@ -41,6 +41,7 @@ public class MapReduceAnalyzerEntity extends AnalyzerEntity { private Map<String, TaskExecutionAPIEntity> tasksMap; private Map<String, TaskAttemptExecutionAPIEntity> completedTaskAttemptsMap; private Configuration jobConf; + private String trackingUrl; public MapReduceAnalyzerEntity() { this.setEndTime(-1); @@ -171,4 +172,12 @@ public class MapReduceAnalyzerEntity extends AnalyzerEntity { this.jobConf = jobConf; } + public String getTrackingUrl() { + return trackingUrl; + } + + public void setTrackingUrl(String trackingUrl) { + this.trackingUrl = trackingUrl; + } + } http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java index cca6b18..0efa323 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java @@ -21,6 +21,7 @@ import com.typesafe.config.Config; import org.apache.eagle.jpm.analyzer.*; import org.apache.eagle.jpm.analyzer.Evaluator; import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.mr.rpc.JobRpcEvaluator; import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator; import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator; import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher; @@ -42,13 +43,12 @@ public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAn private List<Evaluator> evaluators = new ArrayList<>(); private List<Publisher> publishers = new ArrayList<>(); - private Config config; private AlertDeduplicator alertDeduplicator; public MRJobPerformanceAnalyzer(Config config) { - this.config = config; evaluators.add(new SLAJobEvaluator(config)); evaluators.add(new JobSuggestionEvaluator(config)); + evaluators.add(new JobRpcEvaluator()); publishers.add(new EagleStorePublisher(config)); publishers.add(new EmailPublisher(config)); @@ -57,7 +57,7 @@ public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAn } @Override - public void analyze(T analyzerJobEntity) throws Exception { + public Result analyze(T analyzerJobEntity) throws Exception { Result result = new Result(); for (Evaluator evaluator : evaluators) { @@ -73,11 +73,12 @@ public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAn if (alertDeduplicator.dedup(analyzerJobEntity, result)) { LOG.info("skip publish job {} alert because it is duplicated", analyzerJobEntity.getJobId()); - return; + return null; } for (Publisher publisher : publishers) { publisher.publish(analyzerJobEntity, result); } + return result; } } http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java new file mode 100644 index 0000000..457f0c5 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.rpc; + +import org.apache.eagle.jpm.analyzer.Evaluator; +import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.mr.historyentity.JobRpcAnalysisAPIEntity; +import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.eagle.jpm.util.MRJobTagName.*; + +public class JobRpcEvaluator implements Evaluator<MapReduceAnalyzerEntity>, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(JobRpcEvaluator.class); + + @Override + public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity entity) { + try { + double totalMapHdfsOps = 0; + double totalReduceHdfsOps = 0; + + if (entity.getFinishedMaps() != 0) { + totalMapHdfsOps = getTotalHdfsOps(entity.getMapCounters()); + } + if (entity.getFinishedReduces() != 0) { + totalReduceHdfsOps = getTotalHdfsOps(entity.getReduceCounters()); + } + + long mapStartTime = Long.MAX_VALUE; + long mapEndTime = 0; + long reduceStartTime = Long.MAX_VALUE; + long reduceEndTime = 0; + + for (TaskExecutionAPIEntity task : entity.getTasksMap().values()) { + if (task.getTags().get(TASK_TYPE.toString()).equalsIgnoreCase(Constants.TaskType.MAP.toString())) { + if (mapStartTime > task.getStartTime()) { + mapStartTime = task.getStartTime(); + } + if (mapEndTime < task.getEndTime()) { + mapEndTime = task.getEndTime(); + } + } else { + if (reduceStartTime > task.getStartTime()) { + reduceStartTime = task.getStartTime(); + } + if (reduceEndTime < task.getEndTime()) { + reduceEndTime = task.getEndTime(); + } + } + } + + Map<String, String> tags = new HashMap<>(); + tags.put(SITE.toString(), entity.getSiteId()); + tags.put(USER.toString(), entity.getUserId()); + tags.put(JOB_QUEUE.toString(), entity.getJobQueueName()); + tags.put(JOD_DEF_ID.toString(), entity.getJobDefId()); + tags.put(JOB_TYPE.toString(), entity.getJobType()); + tags.put(JOB_ID.toString(), entity.getJobId()); + JobRpcAnalysisAPIEntity analysisAPIEntity = new JobRpcAnalysisAPIEntity(); + analysisAPIEntity.setTags(tags); + analysisAPIEntity.setTimestamp(entity.getStartTime()); + analysisAPIEntity.setTrackingUrl(entity.getTrackingUrl()); + + double totalOpsPerSecond = (entity.getDurationTime() == 0) ? 0 : + (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getDurationTime() / 1000); + double mapOpsPerSecond = (entity.getTotalMaps() == 0) ? 0 : + totalMapHdfsOps / ((mapEndTime - mapStartTime) / 1000); + double reduceOpsPerSecond = (entity.getTotalReduces() == 0) ? 0 : + totalReduceHdfsOps / ((reduceEndTime - reduceStartTime) / 1000); + + double avgOpsPerTask = (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getTotalMaps() + entity.getTotalReduces()); + double avgOpsPerMap = (entity.getTotalMaps() == 0) ? 0 : + totalMapHdfsOps / entity.getTotalMaps(); + double avgOpsPerReduce = (entity.getTotalReduces() == 0) ? 0 : + totalReduceHdfsOps / entity.getTotalReduces(); + + analysisAPIEntity.setTotalOpsPerSecond(totalOpsPerSecond); + analysisAPIEntity.setMapOpsPerSecond(mapOpsPerSecond); + analysisAPIEntity.setReduceOpsPerSecond(reduceOpsPerSecond); + analysisAPIEntity.setAvgOpsPerTask(avgOpsPerTask); + analysisAPIEntity.setAvgOpsPerMap(avgOpsPerMap); + analysisAPIEntity.setAvgOpsPerReduce(avgOpsPerReduce); + + Result.EvaluatorResult result = new Result.EvaluatorResult(); + result.addProcessorEntity(JobRpcEvaluator.class, analysisAPIEntity); + + return result; + } catch (Exception e) { + LOG.error("Rpc analysis failed for job {} due to {}", entity.getJobId(), e.getMessage()); + return null; + } + + } + + private long getTotalHdfsOps(JobCounters counter) { + long mapHdfsReadOps = counter.getCounterValue(JobCounters.CounterName.HDFS_READ_OPS); + long mapHdfsWriteOps = counter.getCounterValue(JobCounters.CounterName.HDFS_WRITE_OPS); + return mapHdfsReadOps + mapHdfsWriteOps; + } + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java index 1c5a033..349a611 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java @@ -44,11 +44,11 @@ public class EagleStorePublisher implements Publisher, Serializable { @Override public void publish(AnalyzerEntity analyzerJobEntity, Result result) { - if (result.getAlertMessages().size() == 0) { + if (result.getAlertEntities().size() == 0) { return; } - LOG.info("EagleStorePublisher gets job {}", analyzerJobEntity.getJobDefId()); + LOG.info("EagleStorePublisher gets job {}", analyzerJobEntity.getJobId()); try { this.client = new EagleServiceClientImpl(config); http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java index 4ec946d..f495cf8 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java @@ -28,8 +28,10 @@ import org.apache.eagle.common.mail.AlertEmailContext; import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator; +import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator; import org.apache.eagle.jpm.analyzer.util.Constants; import org.apache.eagle.jpm.analyzer.util.Utils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +40,7 @@ import java.util.*; public class EmailPublisher implements Publisher, Serializable { private static final Logger LOG = LoggerFactory.getLogger(EmailPublisher.class); + private static final String ALERT_LEVEL_PATH = "application.analyzerReport.alertLevel"; private Config config; private List<Class<?>> publishResult4Evaluators = new ArrayList<>(); @@ -47,6 +50,8 @@ public class EmailPublisher implements Publisher, Serializable { this.config = config; //TODO publishResult4Evaluators.add(SLAJobEvaluator.class); + publishResult4Evaluators.add(JobSuggestionEvaluator.class); + timeZone = TimeZone.getTimeZone((config.hasPath(EagleConfigConstants.EAGLE_TIME_ZONE) ? config.getString(EagleConfigConstants.EAGLE_TIME_ZONE) : EagleConfigConstants.DEFAULT_EAGLE_TIME_ZONE)); @@ -60,11 +65,34 @@ public class EmailPublisher implements Publisher, Serializable { return; } - Map<String, List<Result.ProcessorResult>> extend = new HashMap<>(); + Map<String, List<Result.ProcessorResult>> targetResult = new HashMap<>(); publishResult4Evaluators .stream() .filter(item -> result.getAlertMessages().containsKey(item.getName())) - .forEach(item -> extend.put(item.getName(), result.getAlertMessages().get(item.getName()))); + .forEach(item -> targetResult.put(item.getName(), result.getAlertMessages().get(item.getName()))); + + Map<String, List<Result.ProcessorResult>> extend = new HashMap<>(); + + Result.ResultLevel serverAlertLevel = getServerAlertLevel(); + Result.ResultLevel jobAlertLevel = serverAlertLevel; + for (String evaluator : targetResult.keySet()) { + List<Result.ProcessorResult> alertMessages = new ArrayList<>(); + for (Result.ProcessorResult message : targetResult.get(evaluator)) { + if (isHigherAlertLevel(message.getResultLevel(), serverAlertLevel)) { + alertMessages.add(message); + + if (isHigherAlertLevel(message.getResultLevel(), jobAlertLevel)) { + jobAlertLevel = message.getResultLevel(); + } + + LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]", + analyzerJobEntity.getJobId(), message.getMessage(), message.getResultLevel(), evaluator); + } + } + if (!alertMessages.isEmpty()) { + extend.put(evaluator, alertMessages); + } + } if (extend.size() == 0) { return; @@ -82,19 +110,12 @@ public class EmailPublisher implements Publisher, Serializable { basic.put("end", analyzerJobEntity.getEndTime() == 0 ? "0" : DateTimeUtil.secondsToHumanDate(analyzerJobEntity.getEndTime() / 1000, timeZone)); - double progress = analyzerJobEntity.getCurrentState().equalsIgnoreCase(org.apache.eagle.jpm.util.Constants.JobState.RUNNING.toString()) ? analyzerJobEntity.getProgress() : 100; + double progress = org.apache.eagle.jpm.util.Constants.JobState.RUNNING.toString().equalsIgnoreCase(analyzerJobEntity.getCurrentState()) ? analyzerJobEntity.getProgress() : 100; basic.put("progress", progress + "%"); basic.put("detail", getJobLink(analyzerJobEntity)); Map<String, Object> alertData = new HashMap<>(); - for (String evaluator : extend.keySet()) { - for (Result.ProcessorResult message : extend.get(evaluator)) { - setAlertLevel(alertData, message.getResultLevel()); - LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]", - analyzerJobEntity.getJobDefId(), message.getMessage(), message.getResultLevel(), evaluator); - } - } - + alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, jobAlertLevel.toString()); alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic); alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend); Config cloneConfig = ConfigFactory.empty().withFallback(config); @@ -124,18 +145,19 @@ public class EmailPublisher implements Publisher, Serializable { + analyzerJobEntity.getJobId(); } - private void setAlertLevel(Map<String, Object> alertData, Result.ResultLevel level) { - if (!alertData.containsKey(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY)) { - alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, Result.ResultLevel.INFO.toString()); - } - - if (level.equals(Result.ResultLevel.CRITICAL)) { - alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, level.toString()); + private Result.ResultLevel getServerAlertLevel() { + Result.ResultLevel alertLevel = Result.ResultLevel.WARNING; + if (config.hasPath(ALERT_LEVEL_PATH)) { + try { + alertLevel = Result.ResultLevel.fromString(config.getString(ALERT_LEVEL_PATH)); + } catch (Exception e) { + LOG.warn("invalid alert level config: {}, using WARNING level instead", config.getString(ALERT_LEVEL_PATH)); + } } + return alertLevel; + } - if (level.equals(Result.ResultLevel.WARNING) - && !alertData.get(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY).equals(Result.ResultLevel.CRITICAL.toString())) { - alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, level.toString()); - } + private boolean isHigherAlertLevel(Result.ResultLevel level, Result.ResultLevel serverLevel) { + return level.ordinal() >= serverLevel.ordinal(); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java index 27da4b9..700fae4 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java @@ -35,23 +35,22 @@ public class Result { Map<Class<?>, ProcessorResult> processorResults = result.getProcessorResults(); Map<Class<?>, TaggedLogAPIEntity> processorEntities = result.getProcessorEntities(); + String typeName = type.getName(); + if (!processorEntities.values().isEmpty()) { + alertEntities.put(typeName, new ArrayList<>()); + alertEntities.get(typeName).addAll(processorEntities.values()); + } + for (Class<?> processorType : processorResults.keySet()) { ProcessorResult processorResult = processorResults.get(processorType); - if (processorResult.resultLevel.equals(ResultLevel.NONE)) { - continue; - } - - String typeName = type.getName(); + normalizeResult(processorResult); if (!alertMessages.containsKey(typeName)) { alertMessages.put(typeName, new ArrayList<>()); - alertEntities.put(typeName, new ArrayList<>()); } - normalizeResult(processorResult); alertMessages.get(typeName).add(processorResult); - alertEntities.get(typeName).add(processorEntities.get(processorType)); - } + } public Map<String, List<ProcessorResult>> getAlertMessages() { @@ -80,15 +79,17 @@ public class Result { WARNING, CRITICAL; - private static final Map<String, ResultLevel> stringToLevels = new HashMap<>(); - static { - for (ResultLevel level : values()) { - stringToLevels.put(level.toString(), level); - } + public static ResultLevel fromString(String levelString) { + return ResultLevel.valueOf(levelString); } - public static ResultLevel fromString(String levelString) { - return stringToLevels.get(levelString); + public static boolean contains(String levelString) { + for (ResultLevel level : ResultLevel.values()) { + if (level.name().equals(levelString)) { + return true; + } + } + return false; } } http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/EmailPublisherTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/EmailPublisherTest.java b/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/EmailPublisherTest.java new file mode 100644 index 0000000..e7188d7 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/EmailPublisherTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer; + +import com.dumbster.smtp.SimpleSmtpServer; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator; +import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator; +import org.apache.eagle.jpm.analyzer.mr.suggestion.MapReduceCompressionSettingProcessor; +import org.apache.eagle.jpm.analyzer.mr.suggestion.MapReduceDataSkewProcessor; +import org.apache.eagle.jpm.analyzer.publisher.EmailPublisher; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EmailPublisherTest { + + private static final Logger LOG = LoggerFactory.getLogger(EmailPublisherTest.class); + private static final int SMTP_PORT = 5025; + private SimpleSmtpServer server; + + @Before + public void setUp() { + server = SimpleSmtpServer.start(SMTP_PORT); + } + + @After + public void clear() { + if (server != null) { + server.stop(); + } + } + + @Test + public void test() { + AnalyzerEntity analyzerJobEntity = new AnalyzerEntity(); + analyzerJobEntity.setJobId("job1"); + + Result.EvaluatorResult jobSuggestionResult = new Result.EvaluatorResult(); + jobSuggestionResult.addProcessorResult(MapReduceCompressionSettingProcessor.class, new Result.ProcessorResult(Result.RuleType.COMPRESS, Result.ResultLevel.INFO, "compress")); + jobSuggestionResult.addProcessorResult(MapReduceDataSkewProcessor.class, new Result.ProcessorResult(Result.RuleType.DATA_SKEW, Result.ResultLevel.WARNING, "data skew")); + + Result.EvaluatorResult jobSlaResult = new Result.EvaluatorResult(); + jobSlaResult.addProcessorResult(Processor.class, new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.CRITICAL, "long running job")); + + Result result = new Result(); + result.addEvaluatorResult(JobSuggestionEvaluator.class, jobSuggestionResult); + result.addEvaluatorResult(SLAJobEvaluator.class, jobSlaResult); + + Config config = ConfigFactory.load(); + EmailPublisher publisher = new EmailPublisher(config); + publisher.publish(analyzerJobEntity, result); + + Assert.assertTrue(server.getReceivedEmailSize() == 1 ); + Assert.assertTrue(server.getReceivedEmail().hasNext()); + } + + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/ResultTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/ResultTest.java b/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/ResultTest.java new file mode 100644 index 0000000..2185349 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/test/java/org/apache/eagle/jpm/analyzer/ResultTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer; + +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.junit.Assert; +import org.junit.Test; + +public class ResultTest { + + @Test + public void testResultLevel() { + String info = "info"; + Assert.assertFalse(Result.ResultLevel.contains(info)); + + String INFO = "INFO"; + Assert.assertTrue(Result.ResultLevel.contains(INFO)); + Assert.assertTrue(Result.ResultLevel.fromString(INFO).equals(Result.ResultLevel.INFO)); + + Assert.assertTrue(Result.ResultLevel.INFO.ordinal() == 1); + Assert.assertTrue(Result.ResultLevel.CRITICAL.ordinal() > Result.ResultLevel.INFO.ordinal()); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-analyzer/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/test/resources/application.conf b/eagle-jpm/eagle-jpm-analyzer/src/test/resources/application.conf new file mode 100644 index 0000000..a66e396 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/test/resources/application.conf @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +service { + env = "testing" + host = "localhost" + port = 9090 + username = "admin" + password = "secret" + readTimeOutSeconds = 60 + context = "/rest" + timezone = "UTC" +} + +application { + + mailService { + mailSmtpServer = "localhost", + mailSmtpPort = 5025, + mailSmtpAuth = "false" + //mailSmtpConn = "plaintext", + //mailSmtpUsername = "" + //mailSmtpPassword = "" + //mailSmtpDebug = false + } + analyzerReport { + sender: "a...@example.com" + recipients: "a...@example.com" + template: "AnalyzerReportTemplate.vm" + alertLevel: "CRITICAL" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java index 54383bd..936aa4e 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java @@ -40,5 +40,6 @@ public class JPAEntityRepository extends EntityRepository { entitySet.add(JobCountEntity.class); entitySet.add(TaskAttemptErrorCategoryEntity.class); entitySet.add(JobSuggestionAPIEntity.class); + entitySet.add(JobRpcAnalysisAPIEntity.class); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java new file mode 100644 index 0000000..6c0e539 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.mr.historyentity; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; + +import static org.apache.eagle.jpm.util.Constants.MR_JOB_RPC_ANALYSIS_SERVICE_NAME; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@Table("eaglejpa_analysis") +@ColumnFamily("f") +@Prefix("counter") +@Service(MR_JOB_RPC_ANALYSIS_SERVICE_NAME) +@TimeSeries(true) +@Partition({"site"}) +@Indexes({ + @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true), + @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = false) + }) +public class JobRpcAnalysisAPIEntity extends TaggedLogAPIEntity { + @Column("a") + private String trackingUrl; + @Column("b") + private String currentState; + @Column("c") + private double totalOpsPerSecond; + @Column("d") + private double mapOpsPerSecond; + @Column("e") + private double reduceOpsPerSecond; + @Column("f") + private double avgOpsPerTask; + @Column("g") + private double avgOpsPerMap; + @Column("h") + private double avgOpsPerReduce; + + public String getTrackingUrl() { + return trackingUrl; + } + + public void setTrackingUrl(String trackingUrl) { + this.trackingUrl = trackingUrl; + valueChanged("trackingUrl"); + } + + public String getCurrentState() { + return currentState; + } + + public void setCurrentState(String currentState) { + this.currentState = currentState; + valueChanged("currentState"); + } + + public double getTotalOpsPerSecond() { + return totalOpsPerSecond; + } + + public void setTotalOpsPerSecond(double totalOpsPerSecond) { + this.totalOpsPerSecond = totalOpsPerSecond; + valueChanged("totalOpsPerSecond"); + } + + public double getMapOpsPerSecond() { + return mapOpsPerSecond; + } + + public void setMapOpsPerSecond(double mapOpsPerSecond) { + this.mapOpsPerSecond = mapOpsPerSecond; + valueChanged("mapOpsPerSecond"); + } + + public double getReduceOpsPerSecond() { + return reduceOpsPerSecond; + } + + public void setReduceOpsPerSecond(double reduceOpsPerSecond) { + this.reduceOpsPerSecond = reduceOpsPerSecond; + valueChanged("reduceOpsPerSecond"); + } + + public double getAvgOpsPerTask() { + return avgOpsPerTask; + } + + public void setAvgOpsPerTask(double avgOpsPerTask) { + this.avgOpsPerTask = avgOpsPerTask; + valueChanged("avgOpsPerTask"); + } + + public double getAvgOpsPerMap() { + return avgOpsPerMap; + } + + public void setAvgOpsPerMap(double avgOpsPerMap) { + this.avgOpsPerMap = avgOpsPerMap; + valueChanged("avgOpsPerMap"); + } + + public double getAvgOpsPerReduce() { + return avgOpsPerReduce; + } + + public void setAvgOpsPerReduce(double avgOpsPerReduce) { + this.avgOpsPerReduce = avgOpsPerReduce; + valueChanged("avgOpsPerReduce"); + } + +} + http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java index e433672..cc38cc4 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java @@ -26,7 +26,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import static org.apache.eagle.jpm.util.Constants.MR_JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME; @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -@Table("eaglejpa") +@Table("eaglejpa_analysis") @ColumnFamily("f") @Prefix("jsuggestion") @Service(MR_JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME) http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java index 4d02e91..beb1887 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java @@ -22,11 +22,7 @@ import org.apache.eagle.app.environment.impl.StormEnvironment; import org.apache.eagle.app.messaging.StormStreamSink; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder; -import org.apache.eagle.jpm.mr.history.crawler.JobHistorySpoutCollectorInterceptor; -import org.apache.eagle.jpm.mr.history.publisher.JobStreamPublisher; -import org.apache.eagle.jpm.mr.history.publisher.StreamPublisher; -import org.apache.eagle.jpm.mr.history.publisher.StreamPublisherManager; -import org.apache.eagle.jpm.mr.history.publisher.TaskAttemptStreamPublisher; +import org.apache.eagle.jpm.mr.history.publisher.*; import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout; import org.apache.eagle.jpm.util.Constants; @@ -90,8 +86,9 @@ public class MRHistoryJobApplication extends StormApplication { taskAttemptKafkaBoltDeclarer.shuffleGrouping(spoutName, spoutToTaskAttemptSinkName); List<StreamPublisher> streamPublishers = new ArrayList<>(); - streamPublishers.add(new JobStreamPublisher(spoutToJobSinkName)); + //streamPublishers.add(new JobStreamPublisher(spoutToJobSinkName)); streamPublishers.add(new TaskAttemptStreamPublisher(spoutToTaskAttemptSinkName)); + streamPublishers.add(new JobRpcAnalysisStreamPublisher(spoutToJobSinkName)); jobHistorySpout.setStreamPublishers(streamPublishers); return topologyBuilder.createTopology(); http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java index 2e56632..845aa77 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java @@ -119,7 +119,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { } } jobHistoryFiles.clear(); - LOG.info("after filtering, number of job history files " + processQueue.size()); + LOG.info("after filtering, number of job history files " + allJobHistoryFiles.size()); } Collections.sort(allJobHistoryFiles, http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java index 6020a3b..0f3dd23 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java @@ -18,7 +18,6 @@ package org.apache.eagle.jpm.mr.history.metrics; -import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.MRJobTagName; http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java index ea0ba30..d9f52a3 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import static org.apache.eagle.jpm.util.MRJobTagName.TASK_TYPE; + public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleListener { private static final Logger LOG = LoggerFactory.getLogger(JobEntityLifecycleAggregator.class); private JobExecutionAPIEntity jobExecutionAPIEntity; @@ -107,7 +109,7 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi private void taskAttemptEntityCreated(TaskAttemptExecutionAPIEntity entity) { JobCounters jobCounters = entity.getJobCounters(); - String taskType = entity.getTags().get(Constants.JOB_TASK_TYPE_TAG); + String taskType = entity.getTags().get(TASK_TYPE.toString()); if (taskType != null && jobCounters != null && jobCounters.getCounters() != null) { if (Constants.TaskType.MAP.toString().equals(taskType.toUpperCase())) { http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java index b174a2f..67c1ec9 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java @@ -20,33 +20,38 @@ package org.apache.eagle.jpm.mr.history.parser; import com.typesafe.config.Config; import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; -import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity; -import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; -import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity; -import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.analyzer.mr.rpc.JobRpcEvaluator; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.mr.history.publisher.StreamPublisher; +import org.apache.eagle.jpm.mr.history.publisher.StreamPublisherManager; +import org.apache.eagle.jpm.mr.historyentity.*; import org.apache.eagle.jpm.util.MRJobTagName; import org.apache.eagle.jpm.util.jobcounter.JobCounters; -import org.apache.eagle.jpm.mr.history.parser.JHFEventReaderBase.Keys; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ATTEMPT_ID; import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ID; /* - * JobEventCounterListener provides an interface to add job/task counter analyzers + * JobSuggestionListener provides an interface to analyze job performance and alerts by email */ public class JobSuggestionListener implements HistoryJobEntityCreationListener { private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionListener.class); private MapReduceAnalyzerEntity info; private MRJobPerformanceAnalyzer<MapReduceAnalyzerEntity> analyzer; + private StreamPublisher streamPublisher; public JobSuggestionListener(Config config) { this.info = new MapReduceAnalyzerEntity(); this.analyzer = new MRJobPerformanceAnalyzer<>(config); + this.streamPublisher = StreamPublisherManager.getInstance().getStreamPublisher(JobRpcAnalysisAPIEntity.class); } @Override @@ -75,6 +80,7 @@ public class JobSuggestionListener implements HistoryJobEntityCreationListener { info.setTotalMaps(jobExecutionAPIEntity.getNumTotalMaps()); info.setTotalReduces(jobExecutionAPIEntity.getNumTotalReduces()); info.setProgress(100); + info.setTrackingUrl(((JobExecutionAPIEntity) entity).getTrackingUrl()); } } @@ -90,6 +96,16 @@ public class JobSuggestionListener implements HistoryJobEntityCreationListener { @Override public void flush() throws Exception { - analyzer.analyze(info); + Result result = analyzer.analyze(info); + if (streamPublisher != null && result != null) { + List<TaggedLogAPIEntity> entities = result.getAlertEntities().get(JobRpcEvaluator.class.getName()); + if (entities != null && !entities.isEmpty()) { + for (TaggedLogAPIEntity entity : entities) { + streamPublisher.flush(entity); + } + } + } else { + LOG.warn("JobRpcAnalysisStreamPublisher is null"); + } } } http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java new file mode 100644 index 0000000..3b91fbf --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.mr.history.publisher; + +import org.apache.eagle.dataproc.impl.storm.ValuesArray; +import org.apache.eagle.jpm.mr.historyentity.JobRpcAnalysisAPIEntity; +import org.apache.eagle.jpm.util.MRJobTagName; + +import java.util.HashMap; +import java.util.Map; + + +public class JobRpcAnalysisStreamPublisher extends StreamPublisher<JobRpcAnalysisAPIEntity> { + + public JobRpcAnalysisStreamPublisher(String stormStreamId) { + super(stormStreamId); + } + + @Override + public Class<?> type() { + return JobRpcAnalysisAPIEntity.class; + } + + @Override + public void flush(JobRpcAnalysisAPIEntity entity) { + Map<String, Object> fields = new HashMap<>(entity.getTags()); + fields.put("trackingUrl", entity.getTrackingUrl()); + fields.put("totalOpsPerSecond", entity.getTotalOpsPerSecond()); + fields.put("mapOpsPerSecond", entity.getMapOpsPerSecond()); + fields.put("reduceOpsPerSecond", entity.getReduceOpsPerSecond()); + fields.put("avgOpsPerTask", entity.getAvgOpsPerTask()); + fields.put("avgOpsPerMap", entity.getAvgOpsPerMap()); + fields.put("avgOpsPerReduce", entity.getAvgOpsPerReduce()); + fields.put("currentState", entity.getCurrentState()); + + collector.collect(stormStreamId, new ValuesArray(fields.get(MRJobTagName.JOB_ID.toString()), fields)); + } + + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java index 92c415c..a325cbe 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java @@ -19,7 +19,6 @@ package org.apache.eagle.jpm.mr.history.publisher; import org.apache.eagle.dataproc.impl.storm.ValuesArray; -import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector; import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.MRJobTagName; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml index 5c479a3..01c5e59 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml @@ -192,20 +192,38 @@ <type>string</type> </column> <column> - <name>submissionTime</name> - <type>long</type> - </column> - <column> <name>trackingUrl</name> <type>string</type> </column> <column> - <name>startTime</name> - <type>long</type> + <name>totalOpsPerSecond</name> + <type>double</type> + <defaultValue>0.0</defaultValue> </column> <column> - <name>endTime</name> - <type>long</type> + <name>mapOpsPerSecond</name> + <type>double</type> + <defaultValue>0.0</defaultValue> + </column> + <column> + <name>reduceOpsPerSecond</name> + <type>double</type> + <defaultValue>0.0</defaultValue> + </column> + <column> + <name>avgOpsPerTask</name> + <type>double</type> + <defaultValue>0.0</defaultValue> + </column> + <column> + <name>avgOpsPerMapTask</name> + <type>double</type> + <defaultValue>0.0</defaultValue> + </column> + <column> + <name>avgOpsPerReduceTask</name> + <type>double</type> + <defaultValue>0.0</defaultValue> </column> <column> <name>currentState</name> http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java index 8c12633..420b24c 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java @@ -46,7 +46,7 @@ public class JHFSparkParser implements JHFParserBase { for (String line = reader.readLine(); line != null; line = reader.readLine()) { isValidJson = true; JSONObject eventObj = parseAndValidateJSON(line); - if (isValidJson) { + if (isValidJson && eventObj != null) { this.eventReader.read(eventObj); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java index fee0425..f7bb74e 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java @@ -128,10 +128,9 @@ public class Constants { public static final String MR_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "RunningTaskAttemptExecutionService"; public static final String MR_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService"; public static final String MR_JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME = "JobOptimizerSuggestionService"; + public static final String MR_JOB_RPC_ANALYSIS_SERVICE_NAME = "JobRpcAnalysisService"; public static final String ACCEPTED_APP_SERVICE_NAME = "AcceptedAppService"; - public static final String JOB_TASK_TYPE_TAG = "taskType"; - public static class JobConfiguration { // job type public static final String SCOOBI_JOB = "scoobi.mode"; http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java index 9811772..9b0f79f 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java @@ -43,7 +43,6 @@ public enum MRJobTagName { } public String toString() { - return this.tagName; } } http://git-wip-us.apache.org/repos/asf/eagle/blob/43bd197d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java index e337c13..66cedc3 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java @@ -56,6 +56,14 @@ public final class JobCounters implements Serializable { } } + public Long getCounterValue(String groupName, String counterName) { + if (counters.containsKey(groupName) && counters.get(groupName).containsKey(counterName)) { + return counters.get(groupName).get(counterName); + } else { + return 0L; + } + } + public static enum GroupName { FileSystemCounters("org.apache.hadoop.mapreduce.FileSystemCounter", "FileSystemCounters"), MapReduceTaskCounter("org.apache.hadoop.mapreduce.TaskCounter", "MapReduceTaskCounter"), @@ -80,10 +88,15 @@ public final class JobCounters implements Serializable { public static enum CounterName { - FILE_BYTES_READ(GroupName.FileSystemCounters, "FILE_BYTES_READ", "FILE_BYTES_READ"), - FILE_BYTES_WRITTEN(GroupName.FileSystemCounters, "FILE_BYTES_WRITTEN", "FILE_BYTES_WRITTEN"), - HDFS_BYTES_READ(GroupName.FileSystemCounters, "HDFS_BYTES_READ", "HDFS_BYTES_READ"), - HDFS_BYTES_WRITTEN(GroupName.FileSystemCounters, "HDFS_BYTES_WRITTEN", "HDFS_BYTES_WRITTEN"), + FILE_BYTES_READ(GroupName.FileSystemCounters, "FILE_BYTES_READ", "File read bytes"), + FILE_BYTES_WRITTEN(GroupName.FileSystemCounters, "FILE_BYTES_WRITTEN", "File written bytes"), + HDFS_BYTES_READ(GroupName.FileSystemCounters, "HDFS_BYTES_READ", "HDFS read bytes"), + HDFS_BYTES_WRITTEN(GroupName.FileSystemCounters, "HDFS_BYTES_WRITTEN", "HDFS written bytes"), + HDFS_READ_OPS(GroupName.FileSystemCounters, "HDFS_READ_OPS", "HDFS read ops"), + HDFS_WRITE_OPS(GroupName.FileSystemCounters, "HDFS_WRITE_OPS", "HDFS write ops"), + + MILLIS_MAPS(GroupName.MapReduceJobCounter, "MILLIS_MAPS", "total maps mills"), + MILLIS_REDUCES(GroupName.MapReduceJobCounter, "MILLIS_REDUCES", "total reduce mills"), MAP_INPUT_RECORDS(GroupName.MapReduceTaskCounter, "MAP_INPUT_RECORDS", "Map input records"), MAP_OUTPUT_RECORDS(GroupName.MapReduceTaskCounter, "MAP_OUTPUT_RECORDS", "Map output records"),