[EAGLE-524] aggregation framework-job level metrics aggregation Author: wujinhu <wujinhu...@126.com>
Closes #419 from wujinhu/aggregation. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8774b85c Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8774b85c Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8774b85c Branch: refs/heads/master Commit: 8774b85cd97b7b4c386c1b04cff36c13b3bb82d3 Parents: b66e27b Author: wujinhu <wujinhu...@126.com> Authored: Tue Sep 6 17:22:42 2016 +0800 Committer: Qingwen Zhao <qingwen...@gmail.com> Committed: Tue Sep 6 17:22:42 2016 +0800 ---------------------------------------------------------------------- .../jpm/mr/history/MRHistoryJobApplication.java | 2 +- .../jpm/mr/history/MRHistoryJobConfig.java | 24 +--- .../crawler/DefaultJHFInputStreamCallback.java | 11 +- .../history/crawler/JHFCrawlerDriverImpl.java | 12 +- .../metrics/JobCountMetricsGenerator.java | 18 +-- .../metrics/JobCounterMetricsGenerator.java | 133 +++++++++++++++++++ .../JobExecutionMetricsCreationListener.java | 4 +- .../mr/history/parser/JHFEventReaderBase.java | 16 ++- .../mr/history/parser/JHFMRVer1EventReader.java | 5 +- .../mr/history/parser/JHFMRVer2EventReader.java | 5 +- .../jpm/mr/history/parser/JHFParserFactory.java | 22 +-- ...JobConfigurationCreationServiceListener.java | 16 +-- .../JobEntityCreationEagleServiceListener.java | 22 ++- .../parser/TaskAttemptCounterListener.java | 16 +-- .../mr/history/parser/TaskFailureListener.java | 16 +-- .../jpm/mr/history/storm/JobHistorySpout.java | 37 +++--- .../org/apache/eagle/jpm/util/Constants.java | 2 + 17 files changed, 228 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java index 08607a1..beec938 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java @@ -67,7 +67,7 @@ public class MRHistoryJobApplication extends StormApplication { } topologyBuilder.setSpout( spoutName, - new JobHistorySpout(filter, appConfig), + new JobHistorySpout(filter, config), parallelism ).setNumTasks(tasks); return topologyBuilder.createTopology(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java index c0943de..4ac875b 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java @@ -127,29 +127,19 @@ public class MRHistoryJobConfig implements Serializable { this.controlConfig = new ControlConfig(); this.jobExtractorConfig = new JobExtractorConfig(); this.eagleServiceConfig = new EagleServiceConfig(); - } - - public static MRHistoryJobConfig getInstance(String[] args) { - manager.init(args); - return manager; + this.config = null; } public static MRHistoryJobConfig getInstance(Config config) { - manager.init(config); + if (config != null && manager.config == null) { + manager.init(config); + } + return manager; } - /** - * read configuration file and load hbase config etc. - */ - private void init(String[] args) { - // TODO: Probably we can remove the properties file path check in future - try { - LOG.info("Loading from configuration file"); - init(new ConfigOptionParser().load(args)); - } catch (Exception e) { - LOG.error("failed to load config"); - } + public static MRHistoryJobConfig get() { + return getInstance(null); } /** http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java index 87cd4e0..14b93af 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java @@ -33,20 +33,17 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { private JobHistoryContentFilter filter; - private MRHistoryJobConfig configManager; - public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, EagleOutputCollector eagleCollector) { + public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, EagleOutputCollector eagleCollector) { this.filter = filter; - this.configManager = configManager; } @Override public void onInputStream(InputStream jobFileInputStream, org.apache.hadoop.conf.Configuration conf) throws Exception { - final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); @SuppressWarnings("serial") Map<String, String> baseTags = new HashMap<String, String>() { { - put("site", jobExtractorConfig.site); + put("site", MRHistoryJobConfig.get().getJobExtractorConfig().site); } }; @@ -55,9 +52,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { jobFileInputStream.close(); } else { //get parser and parse, do not need to emit data now - JHFParserBase parser = JHFParserFactory.getParser(configManager, - baseTags, - conf, filter); + JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, filter); parser.parse(jobFileInputStream); jobFileInputStream.close(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java index 2f326fe..55ffc19 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java @@ -65,12 +65,10 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { private TimeZone timeZone; private JobCountMetricsGenerator jobCountMetricsGenerator; - public JHFCrawlerDriverImpl(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig, - MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig, - MRHistoryJobConfig.ControlConfig controlConfig, JHFInputStreamCallback reader, + public JHFCrawlerDriverImpl(JHFInputStreamCallback reader, JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception { - this.zeroBasedMonth = controlConfig.zeroBasedMonth; - this.dryRun = controlConfig.dryRun; + this.zeroBasedMonth = MRHistoryJobConfig.get().getControlConfig().zeroBasedMonth; + this.dryRun = MRHistoryJobConfig.get().getControlConfig().dryRun; if (this.dryRun) { LOG.info("this is a dry run"); } @@ -78,8 +76,8 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig); this.partitionId = partitionId; this.jobFilter = jobFilter; - timeZone = TimeZone.getTimeZone(controlConfig.timeZone); - jobCountMetricsGenerator = new JobCountMetricsGenerator(eagleServiceConfig, jobExtractorConfig, timeZone); + timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getControlConfig().timeZone); + jobCountMetricsGenerator = new JobCountMetricsGenerator(timeZone); } /** http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java index 0e0e5e9..642170d 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java @@ -34,15 +34,9 @@ import java.util.*; public class JobCountMetricsGenerator { private static final Logger LOG = LoggerFactory.getLogger(JobCountMetricsGenerator.class); - private MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig; - private MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig; private TimeZone timeZone; - public JobCountMetricsGenerator(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig, - MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig, - TimeZone timeZone) { - this.eagleServiceConfig = eagleServiceConfig; - this.jobExtractorConfig = jobExtractorConfig; + public JobCountMetricsGenerator(TimeZone timeZone) { this.timeZone = timeZone; } @@ -57,10 +51,10 @@ public class JobCountMetricsGenerator { } IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.eagleServiceHost, - eagleServiceConfig.eagleServicePort, - eagleServiceConfig.username, - eagleServiceConfig.password); + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, + MRHistoryJobConfig.get().getEagleServiceConfig().username, + MRHistoryJobConfig.get().getEagleServiceConfig().password); GregorianCalendar cal = new GregorianCalendar(year, month, day); @@ -72,7 +66,7 @@ public class JobCountMetricsGenerator { @SuppressWarnings("serial") Map<String, String> baseTags = new HashMap<String, String>() { { - put("site", jobExtractorConfig.site); + put("site", MRHistoryJobConfig.get().getJobExtractorConfig().site); } }; metricEntity.setTags(baseTags); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java new file mode 100644 index 0000000..6291b37 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.jpm.mr.history.metrics; + +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; +import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.MRJobTagName; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class JobCounterMetricsGenerator { + private static final Logger LOG = LoggerFactory.getLogger(JobCounterMetricsGenerator.class); + private static final int BATCH_SIZE = 1000; + + private List<List<GenericMetricEntity>> metricEntities = new ArrayList<>(); + //metric, time, value + private Map<String, Map<Long, Long>> metricValueByMinute = new HashMap<>(); + + private List<GenericMetricEntity> lastEntitiesBatch; + private Map<String, String> baseTags; + + public JobCounterMetricsGenerator() { + this.lastEntitiesBatch = null; + } + + public void setBaseTags(Map<String, String> tags) { + this.baseTags = tags; + } + + public void taskExecutionEntityCreated(TaskExecutionAPIEntity taskExecutionAPIEntity) { + JobCounters jobCounters = taskExecutionAPIEntity.getJobCounters(); + if (jobCounters == null || jobCounters.getCounters() == null) { + LOG.warn("found null job counters, task {}", taskExecutionAPIEntity.getTags().get(MRJobTagName.TASK_ID.toString())); + return; + } + + long duration = taskExecutionAPIEntity.getDuration(); + long startTime = taskExecutionAPIEntity.getStartTime(); + long endTime = taskExecutionAPIEntity.getEndTime(); + + Map<String, Map<String, Long>> counters = jobCounters.getCounters(); + for (String groupName : counters.keySet()) { + Map<String, Long> metricValues = counters.get(groupName); + for (String metric : metricValues.keySet()) { + if (!metricValueByMinute.containsKey(metric)) { + metricValueByMinute.put(metric, new HashMap<>()); + } + Long value = metricValues.get(metric); + double avg = value * 1.0 / duration; + for (long i = startTime; i <= endTime;) { + long timeStamp = i / 60000L * 60000L; + if (!metricValueByMinute.get(metric).containsKey(timeStamp)) { + metricValueByMinute.get(metric).put(timeStamp, 0L); + } + long valueByEachMinute = metricValueByMinute.get(metric).get(timeStamp); + if (endTime >= timeStamp + 60000L) { + metricValueByMinute.get(metric).put(timeStamp, valueByEachMinute + (long)(avg * (timeStamp + 60000L - i))); + } else { + metricValueByMinute.get(metric).put(timeStamp, valueByEachMinute + (long)(avg * (endTime - timeStamp))); + } + + i = timeStamp + 60000L; + } + } + } + } + + private String buildMetricName(String field) { + return String.format(Constants.HADOOP_HISTORY_MINUTE_METRIC_FORMAT, Constants.JOB_LEVEL, field); + } + + public void flush() throws Exception { + for (String metric : metricValueByMinute.keySet()) { + Map<Long, Long> valueByMinute = metricValueByMinute.get(metric); + for (Long timeStamp : valueByMinute.keySet()) { + GenericMetricEntity metricEntity = new GenericMetricEntity(); + metricEntity.setTimestamp(timeStamp); + metricEntity.setPrefix(buildMetricName(metric.toLowerCase())); + metricEntity.setValue(new double[] {valueByMinute.get(timeStamp)}); + metricEntity.setTags(this.baseTags); + + if (this.lastEntitiesBatch == null || this.lastEntitiesBatch.size() > BATCH_SIZE) { + this.lastEntitiesBatch = new ArrayList<>(); + metricEntities.add(this.lastEntitiesBatch); + } + + this.lastEntitiesBatch.add(metricEntity); + } + } + + IEagleServiceClient client = new EagleServiceClientImpl( + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, + MRHistoryJobConfig.get().getEagleServiceConfig().username, + MRHistoryJobConfig.get().getEagleServiceConfig().password); + + for (List<GenericMetricEntity> entities : metricEntities) { + LOG.info("start flushing entities of total number " + entities.size()); + client.create(entities); + LOG.info("finish flushing entities of total number " + entities.size()); + entities.clear(); + } + client.getJerseyClient().destroy(); + client.close(); + metricEntities.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java index 2129bed..d7e8fcc 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java @@ -57,7 +57,7 @@ public class JobExecutionMetricsCreationListener extends AbstractMetricsCreation for (Map<String, Long> metricGroup : jobCounters.getCounters().values()) { for (Map.Entry<String, Long> entry : metricGroup.entrySet()) { String metricName = entry.getKey().toLowerCase(); - metrics.add(metricWrapper(timeStamp, "history." + metricName, new double[]{entry.getValue()}, tags)); + metrics.add(metricWrapper(timeStamp, metricName, new double[]{entry.getValue()}, tags)); } } } @@ -67,7 +67,7 @@ public class JobExecutionMetricsCreationListener extends AbstractMetricsCreation @Override public String buildMetricName(String field) { - return String.format(Constants.hadoopMetricFormat, Constants.JOB_LEVEL, field); + return String.format(Constants.HADOOP_HISTORY_TOTAL_METRIC_FORMAT, Constants.JOB_LEVEL, field); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java index 1570956..d33c26b 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java @@ -18,9 +18,9 @@ package org.apache.eagle.jpm.mr.history.parser; -import org.apache.commons.io.FilenameUtils; -import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; +import org.apache.eagle.jpm.mr.history.metrics.JobCounterMetricsGenerator; import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.eagle.jpm.mr.historyentity.*; import org.apache.eagle.jpm.util.Constants; @@ -69,7 +69,6 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl protected String queueName; protected Long jobLaunchTime; protected JobHistoryContentFilter filter; - private JobHistoryEndpointConfig jobHistoryEndpointConfig; protected final List<HistoryJobEntityLifecycleListener> jobEntityLifecycleListeners = new ArrayList<>(); @@ -78,6 +77,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl private long sumMapTaskDuration; private long sumReduceTaskDuration; + private JobCounterMetricsGenerator jobCounterMetricsGenerator; + public Constants.JobType fetchJobType(Configuration config) { if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) { return Constants.JobType.CASCADING; @@ -101,9 +102,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl * * @param baseTags */ - public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) { + public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) { this.filter = filter; - this.jobHistoryEndpointConfig = jobHistoryEndpointConfig; this.baseTags = baseTags; jobSubmitEventEntity = new JobEventAPIEntity(); @@ -134,6 +134,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } this.sumMapTaskDuration = 0L; this.sumReduceTaskDuration = 0L; + this.jobCounterMetricsGenerator = new JobCounterMetricsGenerator(); } public void register(HistoryJobEntityLifecycleListener lifecycleListener) { @@ -148,6 +149,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } try { flush(); + this.jobCounterMetricsGenerator.flush(); } catch (Exception ex) { throw new IOException(ex); } @@ -162,7 +164,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } private String buildJobTrackingUrl(String jobId) { - String jobTrackingUrlBase = this.jobHistoryEndpointConfig.mrHistoryServerUrl + "/jobhistory/job/"; + String jobTrackingUrlBase = MRHistoryJobConfig.getInstance(null).getJobHistoryEndpointConfig().mrHistoryServerUrl + "/jobhistory/job/"; try { URI oldUri = new URI(jobTrackingUrlBase); URI resolved = oldUri.resolve(jobId); @@ -303,6 +305,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } else { jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 / numTotalReduces); } + this.jobCounterMetricsGenerator.setBaseTags(jobExecutionEntity.getTags()); entityCreated(jobExecutionEntity); } } @@ -401,6 +404,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } entityCreated(entity); + this.jobCounterMetricsGenerator.taskExecutionEntityCreated(entity); //_taskStartTime.remove(taskID); // clean this taskID } else if ((recType == RecordTypes.MapAttempt || recType == RecordTypes.ReduceAttempt) && startTime != null) { // task attempt start taskAttemptStartTime.put(taskAttemptID, Long.valueOf(startTime)); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java index 0e9458a..e20836f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java @@ -18,7 +18,6 @@ package org.apache.eagle.jpm.mr.history.parser; -import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; @@ -49,8 +48,8 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer * * @param baseTags */ - public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) { - super(baseTags, configuration, filter, jobHistoryEndpointConfig); + public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) { + super(baseTags, configuration, filter); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java index 74f84f6..0919aa0 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java @@ -18,7 +18,6 @@ package org.apache.eagle.jpm.mr.history.parser; -import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.eagle.jpm.util.jobcounter.JobCounters; @@ -44,8 +43,8 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { * * @throws IOException */ - public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) { - super(baseTags, configuration, filter, jobHistoryEndpointConfig); + public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) { + super(baseTags, configuration, filter); } @SuppressWarnings("deprecation") http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java index 386d50c..56fd956 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java @@ -30,8 +30,8 @@ public class JHFParserFactory { private static final Logger LOG = LoggerFactory.getLogger(JHFParserFactory.class); - public static JHFParserBase getParser(MRHistoryJobConfig configManager, Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) { - String format = configManager.getJobExtractorConfig().mrVersion; + public static JHFParserBase getParser(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) { + String format = MRHistoryJobConfig.get().getJobExtractorConfig().mrVersion; JHFParserBase parser; JHFFormat f; try { @@ -46,21 +46,21 @@ public class JHFParserFactory { switch (f) { case MRVer2: - JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter, configManager.getJobHistoryEndpointConfig()); - reader2.addListener(new JobEntityCreationEagleServiceListener(configManager)); - reader2.addListener(new TaskFailureListener(configManager)); - reader2.addListener(new TaskAttemptCounterListener(configManager)); - reader2.addListener(new JobConfigurationCreationServiceListener(configManager)); + JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter); + reader2.addListener(new JobEntityCreationEagleServiceListener()); + reader2.addListener(new TaskFailureListener()); + reader2.addListener(new TaskAttemptCounterListener()); + reader2.addListener(new JobConfigurationCreationServiceListener()); reader2.register(new JobEntityLifecycleAggregator()); parser = new JHFMRVer2Parser(reader2); break; case MRVer1: default: - JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter, configManager.getJobHistoryEndpointConfig()); - reader1.addListener(new JobEntityCreationEagleServiceListener(configManager)); - reader1.addListener(new TaskFailureListener(configManager)); - reader1.addListener(new TaskAttemptCounterListener(configManager)); + JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter); + reader1.addListener(new JobEntityCreationEagleServiceListener()); + reader1.addListener(new TaskFailureListener()); + reader1.addListener(new TaskAttemptCounterListener()); reader1.register(new JobEntityLifecycleAggregator()); parser = new JHFMRVer1Parser(reader1); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java index 7293c89..bf93432 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java @@ -32,11 +32,9 @@ import java.util.List; public class JobConfigurationCreationServiceListener implements HistoryJobEntityLifecycleListener { private static final Logger logger = LoggerFactory.getLogger(JobConfigurationCreationServiceListener.class); private static final int MAX_RETRY_TIMES = 3; - private MRHistoryJobConfig configManager; private JobConfigurationAPIEntity jobConfigurationEntity; - public JobConfigurationCreationServiceListener(MRHistoryJobConfig configManager) { - this.configManager = configManager; + public JobConfigurationCreationServiceListener() { } @Override @@ -55,15 +53,13 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity @Override public void flush() throws Exception { - MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); - MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.eagleServiceHost, - eagleServiceConfig.eagleServicePort, - eagleServiceConfig.username, - eagleServiceConfig.password); + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, + MRHistoryJobConfig.get().getEagleServiceConfig().username, + MRHistoryJobConfig.get().getEagleServiceConfig().password); - client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); + client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000); List<JobConfigurationAPIEntity> list = new ArrayList<>(); list.add(jobConfigurationEntity); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java index 623a776..74368a5 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java @@ -40,7 +40,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr private static final int BATCH_SIZE = 1000; private int batchSize; private List<JobBaseAPIEntity> list = new ArrayList<>(); - private MRHistoryJobConfig configManager; List<JobExecutionAPIEntity> jobs = new ArrayList<>(); List<JobEventAPIEntity> jobEvents = new ArrayList<>(); List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>(); @@ -48,17 +47,16 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr private JobExecutionMetricsCreationListener jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener(); private TimeZone timeZone; - public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager) { - this(configManager, BATCH_SIZE); + public JobEntityCreationEagleServiceListener() { + this(BATCH_SIZE); } - public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager, int batchSize) { - this.configManager = configManager; + public JobEntityCreationEagleServiceListener(int batchSize) { if (batchSize <= 0) { throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided"); } this.batchSize = batchSize; - timeZone = TimeZone.getTimeZone(configManager.getControlConfig().timeZone); + timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getControlConfig().timeZone); } @Override @@ -84,15 +82,13 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr */ @Override public void flush() throws Exception { - MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); - MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.eagleServiceHost, - eagleServiceConfig.eagleServicePort, - eagleServiceConfig.username, - eagleServiceConfig.password); + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, + MRHistoryJobConfig.get().getEagleServiceConfig().username, + MRHistoryJobConfig.get().getEagleServiceConfig().password); - client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); + client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000); logger.info("start flushing entities of total number " + list.size()); List<GenericMetricEntity> metricEntities = new ArrayList<>(); for (int i = 0; i < list.size(); i++) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java index efc43c5..ef7c8e9 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java @@ -34,10 +34,8 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe private static final Logger logger = LoggerFactory.getLogger(TaskAttemptCounterListener.class); private static final int BATCH_SIZE = 1000; private Map<CounterKey, CounterValue> counters = new HashMap<>(); - private MRHistoryJobConfig configManager; - public TaskAttemptCounterListener(MRHistoryJobConfig configManager) { - this.configManager = configManager; + public TaskAttemptCounterListener() { } private static class CounterKey { @@ -112,15 +110,13 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe @Override public void flush() throws Exception { - MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); - MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.eagleServiceHost, - eagleServiceConfig.eagleServicePort, - eagleServiceConfig.username, - eagleServiceConfig.password); + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, + MRHistoryJobConfig.get().getEagleServiceConfig().username, + MRHistoryJobConfig.get().getEagleServiceConfig().password); - client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); + client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000); List<TaskAttemptCounterAPIEntity> list = new ArrayList<>(); logger.info("start flushing TaskAttemptCounter entities of total number " + counters.size()); // create entity http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java index f95eaa2..1a7a5fc 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java @@ -44,10 +44,8 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener { private final List<TaskFailureCountAPIEntity> failureTasks = new ArrayList<TaskFailureCountAPIEntity>(); private final MRErrorClassifier classifier; - private MRHistoryJobConfig configManager; - public TaskFailureListener(MRHistoryJobConfig configManager) { - this.configManager = configManager; + public TaskFailureListener() { InputStream is = null; try { is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME); @@ -109,15 +107,13 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener { @Override public void flush() throws Exception { - MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); - MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.eagleServiceHost, - eagleServiceConfig.eagleServicePort, - eagleServiceConfig.username, - eagleServiceConfig.password); + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, + MRHistoryJobConfig.get().getEagleServiceConfig().username, + MRHistoryJobConfig.get().getEagleServiceConfig().password); - client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); + client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000); int tried = 0; while (tried <= MAX_RETRY_TIMES) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java index 04283d3..da98e0d 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java @@ -18,6 +18,7 @@ package org.apache.eagle.jpm.mr.history.storm; +import com.typesafe.config.Config; import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.crawler.*; import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager; @@ -91,22 +92,22 @@ public class JobHistorySpout extends BaseRichSpout { private JobHistoryContentFilter contentFilter; private JobHistorySpoutCollectorInterceptor interceptor; private JHFInputStreamCallback callback; - private MRHistoryJobConfig configManager; private JobHistoryLCM jhfLCM; private static final int MAX_RETRY_TIMES = 3; + private Config config; - public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager) { - this(filter, configManager, new JobHistorySpoutCollectorInterceptor()); + public JobHistorySpout(JobHistoryContentFilter filter, Config config) { + this(filter, config, new JobHistorySpoutCollectorInterceptor()); } /** * mostly this constructor signature is for unit test purpose as you can put customized interceptor here. */ - public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, JobHistorySpoutCollectorInterceptor adaptor) { + public JobHistorySpout(JobHistoryContentFilter filter, Config config, JobHistorySpoutCollectorInterceptor adaptor) { this.contentFilter = filter; - this.configManager = configManager; + this.config = config; this.interceptor = adaptor; - callback = new DefaultJHFInputStreamCallback(contentFilter, configManager, interceptor); + callback = new DefaultJHFInputStreamCallback(contentFilter, interceptor); } private int calculatePartitionId(TopologyContext context) { @@ -127,13 +128,14 @@ public class JobHistorySpout extends BaseRichSpout { @Override public void open(Map conf, TopologyContext context, final SpoutOutputCollector collector) { + MRHistoryJobConfig.getInstance(config); partitionId = calculatePartitionId(context); // sanity verify 0<=partitionId<=numTotalPartitions-1 if (partitionId < 0 || partitionId > numTotalPartitions) { throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " + partitionId + " and numTotalPartitions " + numTotalPartitions); } - Class<? extends JobIdPartitioner> partitionerCls = configManager.getControlConfig().partitionerCls; + Class<? extends JobIdPartitioner> partitionerCls = MRHistoryJobConfig.get().getControlConfig().partitionerCls; JobIdPartitioner partitioner; try { partitioner = partitionerCls.newInstance(); @@ -142,16 +144,13 @@ public class JobHistorySpout extends BaseRichSpout { throw new IllegalStateException(e); } JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions, partitionId); - JobHistoryZKStateManager.instance().init(configManager.getZkStateConfig()); + JobHistoryZKStateManager.instance().init(MRHistoryJobConfig.get().getZkStateConfig()); JobHistoryZKStateManager.instance().ensureJobPartitions(numTotalPartitions); interceptor.setSpoutOutputCollector(collector); try { - jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig()); + jhfLCM = new JobHistoryDAOImpl(MRHistoryJobConfig.get().getJobHistoryEndpointConfig()); driver = new JHFCrawlerDriverImpl( - configManager.getEagleServiceConfig(), - configManager.getJobExtractorConfig(), - configManager.getControlConfig(), callback, jhfLCM, jobIdFilter, @@ -232,11 +231,9 @@ public class JobHistorySpout extends BaseRichSpout { } LOG.info("update process time stamp {}", minTimeStamp); - final MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); - final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); Map<String, String> baseTags = new HashMap<String, String>() { { - put("site", jobExtractorConfig.site); + put("site", MRHistoryJobConfig.get().getJobExtractorConfig().site); } }; JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity(); @@ -245,12 +242,12 @@ public class JobHistorySpout extends BaseRichSpout { entity.setTags(baseTags); IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.eagleServiceHost, - eagleServiceConfig.eagleServicePort, - eagleServiceConfig.username, - eagleServiceConfig.password); + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, + MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, + MRHistoryJobConfig.get().getEagleServiceConfig().username, + MRHistoryJobConfig.get().getEagleServiceConfig().password); - client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); + client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000); List<JobProcessTimeStampEntity> entities = new ArrayList<>(); entities.add(entity); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java index e18fe07..5a60ee3 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java @@ -179,4 +179,6 @@ public class Constants { public static final String TASK_LEVEL = "task"; public static final String JOB_COUNT_PER_DAY = "hadoop.job.day.count"; + public static final String HADOOP_HISTORY_TOTAL_METRIC_FORMAT = "hadoop.%s.history.%s"; + public static final String HADOOP_HISTORY_MINUTE_METRIC_FORMAT = "hadoop.%s.history.minute.%s"; }