http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java index 52bd8ea..278599d 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java @@ -18,62 +18,63 @@ package org.apache.eagle.jpm.mr.history.crawler; -import org.apache.commons.lang3.tuple.Pair; import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; -import org.apache.eagle.jpm.util.JobIdFilter; import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateLCM; +import org.apache.eagle.jpm.util.JobIdFilter; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; /** - * single thread crawling per driver - * multiple drivers can achieve parallelism + * single thread crawling per driver. + * multiple drivers can achieve parallelism. * */ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { private static final Logger LOG = LoggerFactory.getLogger(JHFCrawlerDriverImpl.class); - private final static int SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY = 10; - private final static String FORMAT_JOB_PROCESS_DATE = "%4d%02d%02d"; - private final static Pattern PATTERN_JOB_PROCESS_DATE = Pattern.compile("([0-9]{4})([0-9]{2})([0-9]{2})"); + private static final int SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY = 10; + private static final String FORMAT_JOB_PROCESS_DATE = "%4d%02d%02d"; + private static final Pattern PATTERN_JOB_PROCESS_DATE = Pattern.compile("([0-9]{4})([0-9]{2})([0-9]{2})"); private static final int INITIALIZED = 0x0; private static final int TODAY = 0x1; private static final int BEFORETODAY = 0x10; - private final int PROCESSED_JOB_KEEP_DAYS = 5; + private static final int PROCESSED_JOB_KEEP_DAYS = 5; - private int m_flag = INITIALIZED; // 0 not set, 1 TODAY, 2 BEFORETODAY - private Deque<Pair<Long, String> > m_processQueue = new LinkedList<>(); - private Set<String> m_processedJobFileNames = new HashSet<>(); + private int flag = INITIALIZED; // 0 not set, 1 TODAY, 2 BEFORETODAY + private Deque<Pair<Long, String>> processQueue = new LinkedList<>(); + private Set<String> processedJobFileNames = new HashSet<>(); - private final JobProcessDate m_proceeDate = new JobProcessDate(); - private boolean m_dryRun; - private JHFInputStreamCallback m_reader; - protected boolean m_zeroBasedMonth = true; + private final JobProcessDate processDate = new JobProcessDate(); + private boolean dryRun; + private JHFInputStreamCallback reader; + protected boolean zeroBasedMonth = true; - private JobHistoryZKStateLCM m_zkStatelcm; - private JobHistoryLCM m_jhfLCM; - private JobIdFilter m_jobFilter; - private int m_partitionId; - private TimeZone m_timeZone; + private JobHistoryZKStateLCM zkStateLcm; + private JobHistoryLCM jhfLCM; + private JobIdFilter jobFilter; + private int partitionId; + private TimeZone timeZone; public JHFCrawlerDriverImpl(MRHistoryJobConfig.JobHistoryEndpointConfig jobHistoryConfig, MRHistoryJobConfig.ControlConfig controlConfig, JHFInputStreamCallback reader, JobHistoryZKStateLCM zkStateLCM, JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception { - this.m_zeroBasedMonth = controlConfig.zeroBasedMonth; - this.m_dryRun = controlConfig.dryRun; - if (this.m_dryRun) LOG.info("this is a dry run"); - this.m_reader = reader; - m_jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig); - this.m_zkStatelcm = zkStateLCM; - this.m_partitionId = partitionId; - this.m_jobFilter = jobFilter; - m_timeZone = TimeZone.getTimeZone(controlConfig.timeZone); + this.zeroBasedMonth = controlConfig.zeroBasedMonth; + this.dryRun = controlConfig.dryRun; + if (this.dryRun) { + LOG.info("this is a dry run"); + } + this.reader = reader; + jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig); + this.zkStateLcm = zkStateLCM; + this.partitionId = partitionId; + this.jobFilter = jobFilter; + timeZone = TimeZone.getTimeZone(controlConfig.timeZone); } /** @@ -90,70 +91,70 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { */ @Override public long crawl() throws Exception { - LOG.info("queue size is " + m_processQueue.size()); - while (m_processQueue.isEmpty()) { + LOG.info("queue size is " + processQueue.size()); + while (processQueue.isEmpty()) { // read lastProcessedDate only when it's initialized - if (m_flag == INITIALIZED) { + if (flag == INITIALIZED) { readAndCacheLastProcessedDate(); } - if (m_flag == BEFORETODAY) { + if (flag == BEFORETODAY) { updateProcessDate(); clearProcessedJobFileNames(); } - if (m_flag != TODAY) { // advance one day if initialized or BEFORE today + if (flag != TODAY) { // advance one day if initialized or BEFORE today advanceOneDay(); } if (isToday()) { - m_flag = TODAY; + flag = TODAY; } else { - m_flag = BEFORETODAY; + flag = BEFORETODAY; } - List<String> serialNumbers = m_jhfLCM.readSerialNumbers(this.m_proceeDate.year, getActualMonth(m_proceeDate.month), this.m_proceeDate.day); - List<Pair<Long, String> > allJobHistoryFiles = new LinkedList<>(); + List<String> serialNumbers = jhfLCM.readSerialNumbers(this.processDate.year, getActualMonth(processDate.month), this.processDate.day); + List<Pair<Long, String>> allJobHistoryFiles = new LinkedList<>(); for (String serialNumber : serialNumbers) { - List<Pair<Long, String> > jobHistoryFiles = m_jhfLCM.readFileNames( - this.m_proceeDate.year, - getActualMonth(m_proceeDate.month), - this.m_proceeDate.day, + List<Pair<Long, String>> jobHistoryFiles = jhfLCM.readFileNames( + this.processDate.year, + getActualMonth(processDate.month), + this.processDate.day, Integer.parseInt(serialNumber)); LOG.info("total number of job history files " + jobHistoryFiles.size()); for (Pair<Long, String> jobHistoryFile : jobHistoryFiles) { - if (m_jobFilter.accept(jobHistoryFile.getRight()) && !fileProcessed(jobHistoryFile.getRight())) { + if (jobFilter.accept(jobHistoryFile.getRight()) && !fileProcessed(jobHistoryFile.getRight())) { allJobHistoryFiles.add(jobHistoryFile); } } jobHistoryFiles.clear(); - LOG.info("after filtering, number of job history files " + m_processQueue.size()); + LOG.info("after filtering, number of job history files " + processQueue.size()); } Collections.sort(allJobHistoryFiles, - new Comparator<Pair<Long, String>>() { - @Override - public int compare(Pair<Long, String> o1, Pair<Long, String> o2) { - if (o1.getLeft() > o2.getLeft()) return 1; - else if (o1.getLeft() == o2.getLeft()) return 0; - else return -1; - } + (o1, o2) -> { + if (o1.getLeft() > o2.getLeft()) { + return 1; + } else if (o1.getLeft() == o2.getLeft()) { + return 0; + } else { + return -1; } + } ); for (Pair<Long, String> jobHistoryFile : allJobHistoryFiles) { - m_processQueue.add(jobHistoryFile); + processQueue.add(jobHistoryFile); } allJobHistoryFiles.clear(); - if (m_processQueue.isEmpty()) { + if (processQueue.isEmpty()) { Thread.sleep(SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY * 1000); } else { - LOG.info("queue size after populating is now : " + m_processQueue.size()); + LOG.info("queue size after populating is now : " + processQueue.size()); } } // start to process job history file - Pair<Long, String> item = m_processQueue.pollFirst(); + Pair<Long, String> item = processQueue.pollFirst(); String jobHistoryFile = item.getRight(); - Long modifiedTime = item.getLeft(); if (jobHistoryFile == null) { // terminate this round of crawling when the queue is empty LOG.info("process queue is empty, ignore this round"); return -1; @@ -168,33 +169,34 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { LOG.warn("illegal job history file name : " + jobHistoryFile); return -1; } - if (!m_dryRun) { - m_jhfLCM.readFileContent( - m_proceeDate.year, - getActualMonth(m_proceeDate.month), - m_proceeDate.day, + if (!dryRun) { + jhfLCM.readFileContent( + processDate.year, + getActualMonth(processDate.month), + processDate.day, Integer.valueOf(serialNumber), jobHistoryFile, - m_reader); + reader); } - m_zkStatelcm.addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE, - this.m_proceeDate.year, - this.m_proceeDate.month + 1, - this.m_proceeDate.day), + zkStateLcm.addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE, + this.processDate.year, + this.processDate.month + 1, + this.processDate.day), jobHistoryFile); - m_processedJobFileNames.add(jobHistoryFile); + processedJobFileNames.add(jobHistoryFile); + Long modifiedTime = item.getLeft(); return modifiedTime; } private void updateProcessDate() throws Exception { - String line = String.format(FORMAT_JOB_PROCESS_DATE, this.m_proceeDate.year, - this.m_proceeDate.month + 1, this.m_proceeDate.day); - m_zkStatelcm.updateProcessedDate(m_partitionId, line); + String line = String.format(FORMAT_JOB_PROCESS_DATE, this.processDate.year, + this.processDate.month + 1, this.processDate.day); + zkStateLcm.updateProcessedDate(partitionId, line); } - private int getActualMonth(int month){ - return m_zeroBasedMonth ? m_proceeDate.month : m_proceeDate.month + 1; + private int getActualMonth(int month) { + return zeroBasedMonth ? processDate.month : processDate.month + 1; } private static class JobProcessDate { @@ -204,37 +206,37 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { } private void clearProcessedJobFileNames() { - m_processedJobFileNames.clear(); + processedJobFileNames.clear(); } private void readAndCacheLastProcessedDate() throws Exception { - String lastProcessedDate = m_zkStatelcm.readProcessedDate(m_partitionId); + String lastProcessedDate = zkStateLcm.readProcessedDate(partitionId); Matcher m = PATTERN_JOB_PROCESS_DATE.matcher(lastProcessedDate); if (m.find() && m.groupCount() == 3) { - this.m_proceeDate.year = Integer.parseInt(m.group(1)); - this.m_proceeDate.month = Integer.parseInt(m.group(2)) - 1; // zero based month - this.m_proceeDate.day = Integer.parseInt(m.group(3)); + this.processDate.year = Integer.parseInt(m.group(1)); + this.processDate.month = Integer.parseInt(m.group(2)) - 1; // zero based month + this.processDate.day = Integer.parseInt(m.group(3)); } else { throw new IllegalStateException("job lastProcessedDate must have format YYYYMMDD " + lastProcessedDate); } - GregorianCalendar cal = new GregorianCalendar(m_timeZone); - cal.set(this.m_proceeDate.year, this.m_proceeDate.month, this.m_proceeDate.day, 0, 0, 0); + GregorianCalendar cal = new GregorianCalendar(timeZone); + cal.set(this.processDate.year, this.processDate.month, this.processDate.day, 0, 0, 0); cal.add(Calendar.DATE, 1); - List<String> list = m_zkStatelcm.readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR), + List<String> list = zkStateLcm.readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH))); if (list != null) { - this.m_processedJobFileNames = new HashSet<>(list); + this.processedJobFileNames = new HashSet<>(list); } } private void advanceOneDay() throws Exception { - GregorianCalendar cal = new GregorianCalendar(m_timeZone); - cal.set(this.m_proceeDate.year, this.m_proceeDate.month, this.m_proceeDate.day, 0, 0, 0); + GregorianCalendar cal = new GregorianCalendar(timeZone); + cal.set(this.processDate.year, this.processDate.month, this.processDate.day, 0, 0, 0); cal.add(Calendar.DATE, 1); - this.m_proceeDate.year = cal.get(Calendar.YEAR); - this.m_proceeDate.month = cal.get(Calendar.MONTH); - this.m_proceeDate.day = cal.get(Calendar.DAY_OF_MONTH); + this.processDate.year = cal.get(Calendar.YEAR); + this.processDate.month = cal.get(Calendar.MONTH); + this.processDate.day = cal.get(Calendar.DAY_OF_MONTH); try { clearProcessedJob(cal); @@ -249,29 +251,30 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { cal.add(Calendar.DATE, -1 - PROCESSED_JOB_KEEP_DAYS); String line = String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH)); - m_zkStatelcm.truncateProcessedJob(line); + zkStateLcm.truncateProcessedJob(line); } private boolean isToday() { - GregorianCalendar today = new GregorianCalendar(m_timeZone); + GregorianCalendar today = new GregorianCalendar(timeZone); - if (today.get(Calendar.YEAR) == this.m_proceeDate.year - && today.get(Calendar.MONTH) == this.m_proceeDate.month - && today.get(Calendar.DAY_OF_MONTH) == this.m_proceeDate.day) + if (today.get(Calendar.YEAR) == this.processDate.year + && today.get(Calendar.MONTH) == this.processDate.month + && today.get(Calendar.DAY_OF_MONTH) == this.processDate.day) { return true; - + } return false; } /** - * check if this file was already processed + * check if this file was already processed. * * @param fileName * @return */ private boolean fileProcessed(String fileName) { - if (m_processedJobFileNames.contains(fileName)) + if (processedJobFileNames.contains(fileName)) { return true; + } return false; } }
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java index 52a62a4..3e5ce1f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java @@ -24,11 +24,11 @@ import java.io.InputStream; import java.io.Serializable; /** - * callback when job history file input stream is ready + * callback when job history file input stream is ready. */ public interface JHFInputStreamCallback extends Serializable { /** - * this is called when job file string and job configuration file is ready + * this is called when job file string and job configuration file is ready. * @param is * @param configuration * @throws Exception http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java index 66dbce1..2c866f6 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java @@ -24,9 +24,14 @@ import java.util.regex.Pattern; public interface JobHistoryContentFilter extends Serializable { boolean acceptJobFile(); + boolean acceptJobConfFile(); + List<Pattern> getMustHaveJobConfKeyPatterns(); + List<Pattern> getJobConfKeyInclusionPatterns(); + List<Pattern> getJobConfKeyExclusionPatterns(); + String getJobNameKey(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java index 65b8dab..c43c366 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java @@ -27,56 +27,57 @@ import java.util.List; import java.util.regex.Pattern; public class JobHistoryContentFilterBuilder { - private final static Logger LOG = LoggerFactory.getLogger(JobHistoryContentFilterBuilder.class); + private static final Logger LOG = LoggerFactory.getLogger(JobHistoryContentFilterBuilder.class); - private boolean m_acceptJobFile; - private boolean m_acceptJobConfFile; - private List<Pattern> m_mustHaveJobConfKeyPatterns; - private List<Pattern> m_jobConfKeyInclusionPatterns; - private List<Pattern> m_jobConfKeyExclusionPatterns; + private boolean acceptJobFile; + private boolean acceptJobConfFile; + private List<Pattern> mustHaveJobConfKeyPatterns; + private List<Pattern> jobConfKeyInclusionPatterns; + private List<Pattern> jobConfKeyExclusionPatterns; private String jobNameKey; - public static JobHistoryContentFilterBuilder newBuilder(){ + public static JobHistoryContentFilterBuilder newBuilder() { return new JobHistoryContentFilterBuilder(); } public JobHistoryContentFilterBuilder acceptJobFile() { - this.m_acceptJobFile = true; + this.acceptJobFile = true; return this; } public JobHistoryContentFilterBuilder acceptJobConfFile() { - this.m_acceptJobConfFile = true; + this.acceptJobConfFile = true; return this; } public JobHistoryContentFilterBuilder mustHaveJobConfKeyPatterns(Pattern ...patterns) { - m_mustHaveJobConfKeyPatterns = Arrays.asList(patterns); - if (m_jobConfKeyInclusionPatterns != null) { + mustHaveJobConfKeyPatterns = Arrays.asList(patterns); + if (jobConfKeyInclusionPatterns != null) { List<Pattern> list = new ArrayList<Pattern>(); - list.addAll(m_jobConfKeyInclusionPatterns); + list.addAll(jobConfKeyInclusionPatterns); list.addAll(Arrays.asList(patterns)); - m_jobConfKeyInclusionPatterns = list; + jobConfKeyInclusionPatterns = list; + } else { + jobConfKeyInclusionPatterns = Arrays.asList(patterns); } - else - m_jobConfKeyInclusionPatterns = Arrays.asList(patterns); return this; } public JobHistoryContentFilterBuilder includeJobKeyPatterns(Pattern ... patterns) { - if (m_jobConfKeyInclusionPatterns != null) { + if (jobConfKeyInclusionPatterns != null) { List<Pattern> list = new ArrayList<Pattern>(); - list.addAll(m_jobConfKeyInclusionPatterns); + list.addAll(jobConfKeyInclusionPatterns); list.addAll(Arrays.asList(patterns)); - m_jobConfKeyInclusionPatterns = list; - } else - m_jobConfKeyInclusionPatterns = Arrays.asList(patterns); + jobConfKeyInclusionPatterns = list; + } else { + jobConfKeyInclusionPatterns = Arrays.asList(patterns); + } return this; } public JobHistoryContentFilterBuilder excludeJobKeyPatterns(Pattern ...patterns) { - m_jobConfKeyExclusionPatterns = Arrays.asList(patterns); + jobConfKeyExclusionPatterns = Arrays.asList(patterns); return this; } @@ -87,11 +88,11 @@ public class JobHistoryContentFilterBuilder { public JobHistoryContentFilter build() { JobHistoryContentFilterImpl filter = new JobHistoryContentFilterImpl(); - filter.setAcceptJobFile(m_acceptJobFile); - filter.setAcceptJobConfFile(m_acceptJobConfFile); - filter.setMustHaveJobConfKeyPatterns(m_mustHaveJobConfKeyPatterns); - filter.setJobConfKeyInclusionPatterns(m_jobConfKeyInclusionPatterns); - filter.setJobConfKeyExclusionPatterns(m_jobConfKeyExclusionPatterns); + filter.setAcceptJobFile(acceptJobFile); + filter.setAcceptJobConfFile(acceptJobConfFile); + filter.setMustHaveJobConfKeyPatterns(mustHaveJobConfKeyPatterns); + filter.setJobConfKeyInclusionPatterns(jobConfKeyInclusionPatterns); + filter.setJobConfKeyExclusionPatterns(jobConfKeyExclusionPatterns); filter.setJobNameKey(jobNameKey); LOG.info("job history content filter:" + filter); return filter; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java index 5e7a856..5340372 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java @@ -22,37 +22,37 @@ import java.util.List; import java.util.regex.Pattern; public class JobHistoryContentFilterImpl implements JobHistoryContentFilter { - private boolean m_acceptJobFile; - private boolean m_acceptJobConfFile; - private List<Pattern> m_mustHaveJobConfKeyPatterns; - private List<Pattern> m_jobConfKeyInclusionPatterns; - private List<Pattern> m_jobConfKeyExclusionPatterns; + private boolean acceptJobFile; + private boolean acceptJobConfFile; + private List<Pattern> mustHaveJobConfKeyPatterns; + private List<Pattern> jobConfKeyInclusionPatterns; + private List<Pattern> jobConfKeyExclusionPatterns; private String jobNameKey; @Override public boolean acceptJobFile() { - return m_acceptJobFile; + return acceptJobFile; } @Override public boolean acceptJobConfFile() { - return m_acceptJobConfFile; + return acceptJobConfFile; } @Override public List<Pattern> getMustHaveJobConfKeyPatterns() { - return m_mustHaveJobConfKeyPatterns; + return mustHaveJobConfKeyPatterns; } @Override public List<Pattern> getJobConfKeyInclusionPatterns() { - return m_jobConfKeyInclusionPatterns; + return jobConfKeyInclusionPatterns; } @Override public List<Pattern> getJobConfKeyExclusionPatterns() { - return m_jobConfKeyExclusionPatterns; + return jobConfKeyExclusionPatterns; } @Override @@ -65,40 +65,40 @@ public class JobHistoryContentFilterImpl implements JobHistoryContentFilter { } public void setAcceptJobFile(boolean acceptJobFile) { - this.m_acceptJobFile = acceptJobFile; + this.acceptJobFile = acceptJobFile; } public void setAcceptJobConfFile(boolean acceptJobConfFile) { - this.m_acceptJobConfFile = acceptJobConfFile; + this.acceptJobConfFile = acceptJobConfFile; } public void setJobConfKeyInclusionPatterns( List<Pattern> jobConfKeyInclusionPatterns) { - this.m_jobConfKeyInclusionPatterns = jobConfKeyInclusionPatterns; + this.jobConfKeyInclusionPatterns = jobConfKeyInclusionPatterns; } public void setJobConfKeyExclusionPatterns( List<Pattern> jobConfKeyExclusionPatterns) { - this.m_jobConfKeyExclusionPatterns = jobConfKeyExclusionPatterns; + this.jobConfKeyExclusionPatterns = jobConfKeyExclusionPatterns; } public void setMustHaveJobConfKeyPatterns(List<Pattern> mustHaveJobConfKeyPatterns) { - this.m_mustHaveJobConfKeyPatterns = mustHaveJobConfKeyPatterns; + this.mustHaveJobConfKeyPatterns = mustHaveJobConfKeyPatterns; } public String toString() { StringBuilder sb = new StringBuilder(); sb.append("job history file:"); - sb.append(m_acceptJobFile); + sb.append(acceptJobFile); sb.append(", job config file:"); - sb.append(m_acceptJobConfFile); - if(m_acceptJobConfFile){ + sb.append(acceptJobConfFile); + if (acceptJobConfFile) { sb.append(", must contain keys:"); - sb.append(m_mustHaveJobConfKeyPatterns); + sb.append(mustHaveJobConfKeyPatterns); sb.append(", include keys:"); - sb.append(m_jobConfKeyInclusionPatterns); + sb.append(jobConfKeyInclusionPatterns); sb.append(", exclude keys:"); - sb.append(m_jobConfKeyExclusionPatterns); + sb.append(jobConfKeyExclusionPatterns); } return sb.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java index cfd5994..0441f1f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java @@ -18,6 +18,8 @@ package org.apache.eagle.jpm.mr.history.crawler; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig; +import org.apache.eagle.jpm.util.HDFSUtil; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -32,36 +34,32 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; -import org.apache.eagle.jpm.util.HDFSUtil; -import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig; public class JobHistoryDAOImpl extends AbstractJobHistoryDAO { private static final Logger LOG = LoggerFactory.getLogger(JobHistoryDAOImpl.class); - private Configuration m_conf = new Configuration(); + private Configuration conf = new Configuration(); - private FileSystem m_hdfs; + private FileSystem hdfs; public JobHistoryDAOImpl(JobHistoryEndpointConfig endpointConfig) throws Exception { super(endpointConfig.basePath, endpointConfig.pathContainsJobTrackerName, endpointConfig.jobTrackerName); - this.m_conf.set("fs.defaultFS", endpointConfig.nnEndpoint); - this.m_conf.setBoolean("fs.hdfs.impl.disable.cache", true); + this.conf.set("fs.defaultFS", endpointConfig.nnEndpoint); + this.conf.setBoolean("fs.hdfs.impl.disable.cache", true); if (!endpointConfig.principal.equals("")) { - this.m_conf.set("hdfs.kerberos.principal", endpointConfig.principal); - this.m_conf.set("hdfs.keytab.file", endpointConfig.keyTab); + this.conf.set("hdfs.kerberos.principal", endpointConfig.principal); + this.conf.set("hdfs.keytab.file", endpointConfig.keyTab); } LOG.info("file system:" + endpointConfig.nnEndpoint); - m_hdfs = HDFSUtil.getFileSystem(m_conf); + hdfs = HDFSUtil.getFileSystem(conf); } @Override public void freshFileSystem() throws Exception { try { - m_hdfs.close(); - } catch (Exception e) { - + hdfs.close(); } finally { - m_hdfs = HDFSUtil.getFileSystem(m_conf); + hdfs = HDFSUtil.getFileSystem(conf); } } @@ -70,20 +68,17 @@ public class JobHistoryDAOImpl extends AbstractJobHistoryDAO { String latestJobTrackerName = null; try { Path hdfsFile = new Path(basePath); - FileStatus[] files = m_hdfs.listStatus(hdfsFile); + FileStatus[] files = hdfs.listStatus(hdfsFile); // Sort by modification time as order of desc - Arrays.sort(files, new Comparator<FileStatus>() { - @Override - public int compare(FileStatus o1, FileStatus o2) { - long comp = parseJobTrackerNameTimestamp(o1.getPath().toString()) - parseJobTrackerNameTimestamp(o2.getPath().toString()); - if (comp > 0l) { - return -1; - } else if (comp < 0l) { - return 1; - } - return 0; + Arrays.sort(files, (o1, o2) -> { + long comp = parseJobTrackerNameTimestamp(o1.getPath().toString()) - parseJobTrackerNameTimestamp(o2.getPath().toString()); + if (comp > 0L) { + return -1; + } else if (comp < 0L) { + return 1; } + return 0; }); for (FileStatus fs : files) { @@ -94,7 +89,7 @@ public class JobHistoryDAOImpl extends AbstractJobHistoryDAO { break; } } - } catch(Exception ex) { + } catch (Exception ex) { LOG.error("fail read job tracker name " + basePath, ex); throw ex; } @@ -108,7 +103,7 @@ public class JobHistoryDAOImpl extends AbstractJobHistoryDAO { LOG.info("crawl serial numbers under one day : " + dailyPath); try { Path hdfsFile = new Path(dailyPath); - FileStatus[] files = m_hdfs.listStatus(hdfsFile); + FileStatus[] files = hdfs.listStatus(hdfsFile); for (FileStatus fs : files) { if (fs.isDir()) { serialNumbers.add(fs.getPath().getName()); @@ -125,7 +120,8 @@ public class JobHistoryDAOImpl extends AbstractJobHistoryDAO { } StringBuilder sb = new StringBuilder(); for (String sn : serialNumbers) { - sb.append(sn);sb.append(","); + sb.append(sn); + sb.append(","); } LOG.info("crawled serialNumbers: " + sb); return serialNumbers; @@ -133,18 +129,19 @@ public class JobHistoryDAOImpl extends AbstractJobHistoryDAO { @SuppressWarnings("deprecation") @Override - public List<Pair<Long, String> > readFileNames(int year, int month, int day, int serialNumber) throws Exception { + public List<Pair<Long, String>> readFileNames(int year, int month, int day, int serialNumber) throws Exception { LOG.info("crawl file names under one serial number : " + year + "/" + month + "/" + day + ":" + serialNumber); - List<Pair<Long, String> > jobFileNames = new ArrayList<>(); + List<Pair<Long, String>> jobFileNames = new ArrayList<>(); String serialPath = buildWholePathToSerialNumber(year, month, day, serialNumber); try { Path hdfsFile = new Path(serialPath); // filter those files which is job configuration file in xml format - FileStatus[] files = m_hdfs.listStatus(hdfsFile, new PathFilter(){ + FileStatus[] files = hdfs.listStatus(hdfsFile, new PathFilter() { @Override - public boolean accept(Path path){ - if (path.getName().endsWith(".xml")) + public boolean accept(Path path) { + if (path.getName().endsWith(".xml")) { return false; + } return true; } }); @@ -156,7 +153,8 @@ public class JobHistoryDAOImpl extends AbstractJobHistoryDAO { if (LOG.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); for (Pair<Long, String> sn : jobFileNames) { - sb.append(sn.getRight());sb.append(","); + sb.append(sn.getRight()); + sb.append(","); } LOG.debug("crawled: " + sb); } @@ -168,7 +166,7 @@ public class JobHistoryDAOImpl extends AbstractJobHistoryDAO { } /** - * it's the responsibility of caller to close input stream + * it's the responsibility of caller to close input stream. */ @Override public InputStream getJHFFileContentAsStream(int year, int month, int day, int serialNumber, String jobHistoryFileName) throws Exception { @@ -176,26 +174,28 @@ public class JobHistoryDAOImpl extends AbstractJobHistoryDAO { LOG.info("Read job history file: " + path); try { Path hdfsFile = new Path(path); - return m_hdfs.open(hdfsFile); - } catch(Exception ex) { + return hdfs.open(hdfsFile); + } catch (Exception ex) { LOG.error("fail getting hdfs file inputstream " + path, ex); throw ex; } } /** - * it's the responsibility of caller to close input stream + * it's the responsibility of caller to close input stream. */ @Override public InputStream getJHFConfContentAsStream(int year, int month, int day, int serialNumber, String jobHistoryFileName) throws Exception { String path = buildWholePathToJobConfFile(year, month, day, serialNumber,jobHistoryFileName); - if (path == null) return null; + if (path == null) { + return null; + } LOG.info("Read job conf file: " + path); try { Path hdfsFile = new Path(path); - return m_hdfs.open(hdfsFile); - } catch(Exception ex) { + return hdfs.open(hdfsFile); + } catch (Exception ex) { LOG.error("fail getting job configuration input stream from " + path, ex); throw ex; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java index de8d3f7..f428b11 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java @@ -25,12 +25,12 @@ import java.util.List; /** * Define various operations on job history file resource for lifecycle management - * + * <p></p> * The job history file directory structure supported is as follows: - * <basePath>/<jobTrackerName>/<year>/<month>/<day>/<serialNumber>/<jobHistoryFileName> - * - * In some hadoop version, <jobTrackerName> is not included - * + * basePath/jobTrackerName/year/month/day/serialNumber/jobHistoryFileName + * <p></p> + * In some hadoop version, jobTrackerName is not included + * <p></p> * The operations involved in resource read * - list job tracker names under basePath (mostly basePath is configured in entry mapreduce.jobhistory.done-dir of mapred-site.xml) * - list serial numbers under one day @@ -40,7 +40,9 @@ import java.util.List; */ public interface JobHistoryLCM { String calculateJobTrackerName(String basePath) throws Exception; + /** + * ... * @param year * @param month 0-based or 1-based month depending on hadoop cluster setting * @param day @@ -48,7 +50,9 @@ public interface JobHistoryLCM { * @throws Exception */ List<String> readSerialNumbers(int year, int month, int day) throws Exception; + /** + * ... * @param year * @param month 0-based or 1-based month depending on hadoop cluster setting * @param day @@ -56,8 +60,10 @@ public interface JobHistoryLCM { * @return * @throws Exception */ - List<Pair<Long, String> > readFileNames(int year, int month, int day, int serialNumber) throws Exception; + List<Pair<Long, String>> readFileNames(int year, int month, int day, int serialNumber) throws Exception; + /** + * ... * @param year * @param month 0-based or 1-based month depending on hadoop cluster setting * @param day @@ -67,7 +73,9 @@ public interface JobHistoryLCM { * @throws Exception */ void readFileContent(int year, int month, int day, int serialNumber, String jobHistoryFileName, JHFInputStreamCallback reader) throws Exception; + /** + * ... * @param year * @param month 0-based or 1-based month depending on hadoop cluster setting * @param day @@ -77,10 +85,8 @@ public interface JobHistoryLCM { * @throws Exception */ InputStream getJHFFileContentAsStream(int year, int month, int day, int serialNumber, String jobHistoryFileName) throws Exception; + InputStream getJHFConfContentAsStream(int year, int month, int day, int serialNumber, String jobConfFileName) throws Exception; - /** - * - */ void freshFileSystem() throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java index 211dd9d..a7dc9a8 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java @@ -18,18 +18,18 @@ package org.apache.eagle.jpm.mr.history.crawler; -import backtype.storm.spout.SpoutOutputCollector; import org.apache.eagle.dataproc.impl.storm.ValuesArray; +import backtype.storm.spout.SpoutOutputCollector; public class JobHistorySpoutCollectorInterceptor implements EagleOutputCollector { - private SpoutOutputCollector m_collector; + private SpoutOutputCollector collector; public void setSpoutOutputCollector(SpoutOutputCollector collector) { - this.m_collector = collector; + this.collector = collector; } @Override public void collect(ValuesArray t) { - m_collector.emit(t); + collector.emit(t); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java index 892c2ea..dec3f82 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java @@ -23,19 +23,17 @@ import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity; /** * generalizing this listener would decouple entity creation and entity handling, also will help unit testing. - * - * @author yonzhang */ public interface HistoryJobEntityCreationListener { /** - * job entity created event + * job entity created event. * * @param entity */ void jobEntityCreated(JobBaseAPIEntity entity) throws Exception; /** - * for streaming processing, flush would help commit the last several entities + * for streaming processing, flush would help commit the last several entities. */ void flush() throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java index 82e305a..5d3d5b4 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java @@ -41,30 +41,30 @@ import java.util.regex.Pattern; */ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(JHFEventReaderBase.class); - protected Map<String, String> m_baseTags; - protected JobEventAPIEntity m_jobSubmitEventEntity; - protected JobEventAPIEntity m_jobLaunchEventEntity; - protected int m_numTotalMaps; - protected int m_numTotalReduces; - protected JobEventAPIEntity m_jobFinishEventEntity; - protected JobExecutionAPIEntity m_jobExecutionEntity; - protected Map<String, Long> m_taskStartTime; + protected Map<String, String> baseTags; + protected JobEventAPIEntity jobSubmitEventEntity; + protected JobEventAPIEntity jobLaunchEventEntity; + protected int numTotalMaps; + protected int numTotalReduces; + protected JobEventAPIEntity jobFinishEventEntity; + protected JobExecutionAPIEntity jobExecutionEntity; + protected Map<String, Long> taskStartTime; // taskAttemptID to task attempt startTime - protected Map<String, Long> m_taskAttemptStartTime; + protected Map<String, Long> taskAttemptStartTime; // taskID to host mapping, for task it's the host where the last attempt runs on - protected Map<String, String> m_taskRunningHosts; + protected Map<String, String> taskRunningHosts; // hostname to rack mapping - protected Map<String, String> m_host2RackMapping; + protected Map<String, String> host2RackMapping; - protected String m_jobId; - protected String m_jobName; - protected String m_jobType; - protected String m_jobDefId; - protected String m_user; - protected String m_queueName; - protected Long m_jobLauchTime; - protected JobHistoryContentFilter m_filter; + protected String jobId; + protected String jobName; + protected String jobType; + protected String jobDefId; + protected String user; + protected String queueName; + protected Long jobLaunchTime; + protected JobHistoryContentFilter filter; protected final List<HistoryJobEntityLifecycleListener> jobEntityLifecycleListeners = new ArrayList<>(); @@ -90,42 +90,42 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } /** - * baseTags stores the basic tag name values which might be used for persisting various entities + * baseTags stores the basic tag name values which might be used for persisting various entities. * baseTags includes: cluster, datacenter and jobName * baseTags are used for all job/task related entities * * @param baseTags */ public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) { - this.m_filter = filter; + this.filter = filter; - this.m_baseTags = baseTags; - m_jobSubmitEventEntity = new JobEventAPIEntity(); - m_jobSubmitEventEntity.setTags(new HashMap<>(baseTags)); + this.baseTags = baseTags; + jobSubmitEventEntity = new JobEventAPIEntity(); + jobSubmitEventEntity.setTags(new HashMap<>(baseTags)); - m_jobLaunchEventEntity = new JobEventAPIEntity(); - m_jobLaunchEventEntity.setTags(new HashMap<>(baseTags)); + jobLaunchEventEntity = new JobEventAPIEntity(); + jobLaunchEventEntity.setTags(new HashMap<>(baseTags)); - m_jobFinishEventEntity = new JobEventAPIEntity(); - m_jobFinishEventEntity.setTags(new HashMap<>(baseTags)); + jobFinishEventEntity = new JobEventAPIEntity(); + jobFinishEventEntity.setTags(new HashMap<>(baseTags)); - m_jobExecutionEntity = new JobExecutionAPIEntity(); - m_jobExecutionEntity.setTags(new HashMap<>(baseTags)); + jobExecutionEntity = new JobExecutionAPIEntity(); + jobExecutionEntity.setTags(new HashMap<>(baseTags)); - m_taskRunningHosts = new HashMap<>(); + taskRunningHosts = new HashMap<>(); - m_host2RackMapping = new HashMap<>(); + host2RackMapping = new HashMap<>(); - m_taskStartTime = new HashMap<>(); - m_taskAttemptStartTime = new HashMap<>(); + taskStartTime = new HashMap<>(); + taskAttemptStartTime = new HashMap<>(); this.configuration = configuration; - if (this.configuration != null && this.m_jobType == null) { + if (this.configuration != null && this.jobType == null) { this.setJobType(fetchJobType(this.configuration).toString()); } - this.sumMapTaskDuration = 0l; - this.sumReduceTaskDuration = 0l; + this.sumMapTaskDuration = 0L; + this.sumReduceTaskDuration = 0L; } public void register(HistoryJobEntityLifecycleListener lifecycleListener) { @@ -135,8 +135,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl @Override public void close() throws IOException { // check if this job history file is complete - if (m_jobExecutionEntity.getEndTime() == 0L) { - throw new IOException(new JHFWriteNotCompletedException(m_jobId)); + if (jobExecutionEntity.getEndTime() == 0L) { + throw new IOException(new JHFWriteNotCompletedException(jobId)); } try { flush(); @@ -154,134 +154,135 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } /** + * ... * @param id */ private void setJobID(String id) { - this.m_jobId = id; + this.jobId = id; } private void setJobType(String jobType) { - this.m_jobType = jobType; + this.jobType = jobType; } protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters) throws Exception { String id = values.get(Keys.JOBID); - if (m_jobId == null) { + if (jobId == null) { setJobID(id); - } else if (!m_jobId.equals(id)) { - String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobId + "'"; + } else if (!jobId.equals(id)) { + String msg = "Current job ID '" + id + "' does not match previously stored value '" + jobId + "'"; LOG.error(msg); throw new ImportException(msg); } if (values.get(Keys.SUBMIT_TIME) != null) { // job submitted - m_jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME))); - m_user = values.get(Keys.USER); - m_queueName = values.get(Keys.JOB_QUEUE); - m_jobName = values.get(Keys.JOBNAME); + jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME))); + user = values.get(Keys.USER); + queueName = values.get(Keys.JOB_QUEUE); + jobName = values.get(Keys.JOBNAME); // If given job name then use it as norm job name, otherwise use eagle JobNameNormalization rule to generate. String jobDefId = null; if (configuration != null) { - jobDefId = configuration.get(m_filter.getJobNameKey()); + jobDefId = configuration.get(filter.getJobNameKey()); } if (jobDefId == null) { - m_jobDefId = JobNameNormalization.getInstance().normalize(m_jobName); + this.jobDefId = JobNameNormalization.getInstance().normalize(jobName); } else { LOG.debug("Got JobDefId from job configuration for " + id + ": " + jobDefId); - m_jobDefId = jobDefId; + this.jobDefId = jobDefId; } - LOG.info("JobDefId of " + id + ": " + m_jobDefId); + LOG.info("JobDefId of " + id + ": " + this.jobDefId); - m_jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); - m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); - m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name()); - m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); - m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType); - entityCreated(m_jobSubmitEventEntity); + jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), user); + jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), jobId); + jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name()); + jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), jobName); + jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), this.jobDefId); + jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.jobType); + entityCreated(jobSubmitEventEntity); } else if (values.get(Keys.LAUNCH_TIME) != null) { // job launched - m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME))); - m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp(); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name()); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType); - m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS)); - m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES)); - entityCreated(m_jobLaunchEventEntity); + jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME))); + jobLaunchTime = jobLaunchEventEntity.getTimestamp(); + jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), user); + jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), jobId); + jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name()); + jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), jobName); + jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), jobDefId); + jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.jobType); + numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS)); + numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES)); + entityCreated(jobLaunchEventEntity); } else if (values.get(Keys.FINISH_TIME) != null) { // job finished - m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME))); - m_jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); - m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); - m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS)); - m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); - m_jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); - m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType); - entityCreated(m_jobFinishEventEntity); + jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME))); + jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), user); + jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), jobId); + jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS)); + jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), jobName); + jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), jobDefId); + jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.jobType); + entityCreated(jobFinishEventEntity); // populate jobExecutionEntity entity - m_jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), m_user); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), m_queueName); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType); - - m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS)); - m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp()); - m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp()); - m_jobExecutionEntity.setDurationTime(m_jobExecutionEntity.getEndTime() - m_jobExecutionEntity.getStartTime()); - m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp()); - m_jobExecutionEntity.setSubmissionTime(m_jobSubmitEventEntity.getTimestamp()); + jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), user); + jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), jobId); + jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), jobName); + jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), jobDefId); + jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), queueName); + jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.jobType); + + jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS)); + jobExecutionEntity.setStartTime(jobLaunchEventEntity.getTimestamp()); + jobExecutionEntity.setEndTime(jobFinishEventEntity.getTimestamp()); + jobExecutionEntity.setDurationTime(jobExecutionEntity.getEndTime() - jobExecutionEntity.getStartTime()); + jobExecutionEntity.setTimestamp(jobLaunchEventEntity.getTimestamp()); + jobExecutionEntity.setSubmissionTime(jobSubmitEventEntity.getTimestamp()); if (values.get(Keys.FAILED_MAPS) != null) { // for Artemis - m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS))); + jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS))); } if (values.get(Keys.FAILED_REDUCES) != null) { // for Artemis - m_jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES))); + jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES))); } - m_jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS))); - m_jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES))); - m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps); - m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces); + jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS))); + jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES))); + jobExecutionEntity.setNumTotalMaps(numTotalMaps); + jobExecutionEntity.setNumTotalReduces(numTotalReduces); if (values.get(Keys.COUNTERS) != null || totalCounters != null) { JobCounters jobCounters = parseCounters(totalCounters); - m_jobExecutionEntity.setJobCounters(jobCounters); + jobExecutionEntity.setJobCounters(jobCounters); if (jobCounters.getCounters().containsKey(Constants.JOB_COUNTER)) { Map<String, Long> counters = jobCounters.getCounters().get(Constants.JOB_COUNTER); if (counters.containsKey(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) { - m_jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue()); + jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue()); } if (counters.containsKey(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) { - m_jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue()); + jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue()); } if (counters.containsKey(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) { - m_jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue()); + jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue()); } } - if (m_jobExecutionEntity.getTotalLaunchedMaps() > 0) { - m_jobExecutionEntity.setDataLocalMapsPercentage(m_jobExecutionEntity.getDataLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps()); - m_jobExecutionEntity.setRackLocalMapsPercentage(m_jobExecutionEntity.getRackLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps()); + if (jobExecutionEntity.getTotalLaunchedMaps() > 0) { + jobExecutionEntity.setDataLocalMapsPercentage(jobExecutionEntity.getDataLocalMaps() * 1.0 / jobExecutionEntity.getTotalLaunchedMaps()); + jobExecutionEntity.setRackLocalMapsPercentage(jobExecutionEntity.getRackLocalMaps() * 1.0 / jobExecutionEntity.getTotalLaunchedMaps()); } } - m_jobExecutionEntity.setAvgMapTaskDuration(this.sumMapTaskDuration * 1.0 / m_numTotalMaps); - if (m_numTotalReduces == 0) { - m_jobExecutionEntity.setMaxReduceTaskDuration(0); - m_jobExecutionEntity.setAvgReduceTaskDuration(0); + jobExecutionEntity.setAvgMapTaskDuration(this.sumMapTaskDuration * 1.0 / numTotalMaps); + if (numTotalReduces == 0) { + jobExecutionEntity.setMaxReduceTaskDuration(0); + jobExecutionEntity.setAvgReduceTaskDuration(0); } else { - m_jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 / m_numTotalReduces); + jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 / numTotalReduces); } - entityCreated(m_jobExecutionEntity); + entityCreated(jobExecutionEntity); } } @@ -290,8 +291,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl lifecycleListener.jobEntityCreated(entity); } - // job finished when passing JobExecutionAPIEntity: m_jobExecutionEntity - if (entity == this.m_jobExecutionEntity) { + // job finished when passing JobExecutionAPIEntity: jobExecutionEntity + if (entity == this.jobExecutionEntity) { for (HistoryJobEntityLifecycleListener lifecycleListener : this.jobEntityLifecycleListeners) { lifecycleListener.jobFinish(); } @@ -317,49 +318,51 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl final String taskType = values.get(Keys.TASK_TYPE); final String taskID = values.get(Keys.TASKID); - Map<String, String> taskBaseTags = new HashMap<String, String>() {{ - put(MRJobTagName.TASK_TYPE.toString(), taskType); - put(MRJobTagName.USER.toString(), m_user); - //put(MRJobTagName.JOB_NAME.toString(), _jobName); - put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); - put(MRJobTagName.JOB_TYPE.toString(), m_jobType); - put(MRJobTagName.JOB_ID.toString(), m_jobId); - put(MRJobTagName.TASK_ID.toString(), taskID); - }}; - taskBaseTags.putAll(m_baseTags); + Map<String, String> taskBaseTags = new HashMap<String, String>() { + { + put(MRJobTagName.TASK_TYPE.toString(), taskType); + put(MRJobTagName.USER.toString(), user); + //put(MRJobTagName.JOB_NAME.toString(), _jobName); + put(MRJobTagName.JOD_DEF_ID.toString(), jobDefId); + put(MRJobTagName.JOB_TYPE.toString(), jobType); + put(MRJobTagName.JOB_ID.toString(), jobId); + put(MRJobTagName.TASK_ID.toString(), taskID); + } + }; + taskBaseTags.putAll(baseTags); if (recType == RecordTypes.Task && startTime != null) { // task start, no host is assigned yet - m_taskStartTime.put(taskID, Long.valueOf(startTime)); + taskStartTime.put(taskID, Long.valueOf(startTime)); } else if (recType == RecordTypes.Task && finishTime != null) { // task finish // task execution entity setup - TaskExecutionAPIEntity entity = new TaskExecutionAPIEntity(); Map<String, String> taskExecutionTags = new HashMap<>(taskBaseTags); - String hostname = m_taskRunningHosts.get(taskID); + String hostname = taskRunningHosts.get(taskID); hostname = (hostname == null) ? "" : hostname; // TODO if task fails, then no hostname taskExecutionTags.put(MRJobTagName.HOSTNAME.toString(), hostname); - taskExecutionTags.put(MRJobTagName.RACK.toString(), m_host2RackMapping.get(hostname)); + taskExecutionTags.put(MRJobTagName.RACK.toString(), host2RackMapping.get(hostname)); + TaskExecutionAPIEntity entity = new TaskExecutionAPIEntity(); entity.setTags(taskExecutionTags); - entity.setStartTime(m_taskStartTime.get(taskID)); + entity.setStartTime(taskStartTime.get(taskID)); entity.setEndTime(Long.valueOf(finishTime)); entity.setDuration(entity.getEndTime() - entity.getStartTime()); - entity.setTimestamp(m_jobLauchTime); + entity.setTimestamp(jobLaunchTime); entity.setError(values.get(Keys.ERROR)); entity.setTaskStatus(values.get(Keys.TASK_STATUS)); if (values.get(Keys.COUNTERS) != null || counters != null) { entity.setJobCounters(parseCounters(counters)); } - long duration = entity.getEndTime() - m_jobSubmitEventEntity.getTimestamp(); - if (taskType.equals(Constants.TaskType.MAP.toString()) && duration > m_jobExecutionEntity.getLastMapDuration()) { - m_jobExecutionEntity.setLastMapDuration(duration); + long duration = entity.getEndTime() - jobSubmitEventEntity.getTimestamp(); + if (taskType.equals(Constants.TaskType.MAP.toString()) && duration > jobExecutionEntity.getLastMapDuration()) { + jobExecutionEntity.setLastMapDuration(duration); } - if (taskType.equals(Constants.TaskType.REDUCE.toString()) && duration > m_jobExecutionEntity.getLastReduceDuration()) { - m_jobExecutionEntity.setLastReduceDuration(duration); + if (taskType.equals(Constants.TaskType.REDUCE.toString()) && duration > jobExecutionEntity.getLastReduceDuration()) { + jobExecutionEntity.setLastReduceDuration(duration); } - if (taskType.equals(Constants.TaskType.MAP.toString()) && entity.getDuration() > m_jobExecutionEntity.getMaxMapTaskDuration()) { - m_jobExecutionEntity.setMaxMapTaskDuration(entity.getDuration()); + if (taskType.equals(Constants.TaskType.MAP.toString()) && entity.getDuration() > jobExecutionEntity.getMaxMapTaskDuration()) { + jobExecutionEntity.setMaxMapTaskDuration(entity.getDuration()); } - if (taskType.equals(Constants.TaskType.REDUCE.toString()) && entity.getDuration() > m_jobExecutionEntity.getMaxReduceTaskDuration()) { - m_jobExecutionEntity.setMaxReduceTaskDuration(entity.getDuration()); + if (taskType.equals(Constants.TaskType.REDUCE.toString()) && entity.getDuration() > jobExecutionEntity.getMaxReduceTaskDuration()) { + jobExecutionEntity.setMaxReduceTaskDuration(entity.getDuration()); } if (taskType.equals(Constants.TaskType.MAP.toString())) { @@ -371,7 +374,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl entityCreated(entity); //_taskStartTime.remove(taskID); // clean this taskID } else if ((recType == RecordTypes.MapAttempt || recType == RecordTypes.ReduceAttempt) && startTime != null) { // task attempt start - m_taskAttemptStartTime.put(taskAttemptID, Long.valueOf(startTime)); + taskAttemptStartTime.put(taskAttemptID, Long.valueOf(startTime)); } else if ((recType == RecordTypes.MapAttempt || recType == RecordTypes.ReduceAttempt) && finishTime != null) { // task attempt finish TaskAttemptExecutionAPIEntity entity = new TaskAttemptExecutionAPIEntity(); Map<String, String> taskAttemptExecutionTags = new HashMap<>(taskBaseTags); @@ -381,16 +384,16 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl taskAttemptExecutionTags.put(MRJobTagName.HOSTNAME.toString(), hostname); taskAttemptExecutionTags.put(MRJobTagName.RACK.toString(), rack); // put last attempt's hostname to task level - m_taskRunningHosts.put(taskID, hostname); + taskRunningHosts.put(taskID, hostname); // it is very likely that an attempt ID could be both succeeded and failed due to M/R system // in this case, we should ignore this attempt? - if (m_taskAttemptStartTime.get(taskAttemptID) == null) { + if (taskAttemptStartTime.get(taskAttemptID) == null) { LOG.warn("task attemp has consistency issue " + taskAttemptID); return; } - entity.setStartTime(m_taskAttemptStartTime.get(taskAttemptID)); + entity.setStartTime(taskAttemptStartTime.get(taskAttemptID)); entity.setEndTime(Long.valueOf(finishTime)); - entity.setTimestamp(m_jobLauchTime); + entity.setTimestamp(jobLaunchTime); entity.setDuration(entity.getEndTime() - entity.getStartTime()); entity.setTaskStatus(values.get(Keys.TASK_STATUS)); entity.setError(values.get(Keys.ERROR)); @@ -400,7 +403,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } entity.setTaskAttemptID(taskAttemptID); entityCreated(entity); - m_taskAttemptStartTime.remove(taskAttemptID); + taskAttemptStartTime.remove(taskAttemptID); } else { // silently ignore LOG.warn("It's an exceptional case ?"); @@ -410,7 +413,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl public void parseConfiguration() throws Exception { Map<String, String> prop = new TreeMap<>(); - if (m_filter.acceptJobConfFile()) { + if (filter.acceptJobConfFile()) { Iterator<Map.Entry<String, String>> iter = configuration.iterator(); while (iter.hasNext()) { String key = iter.next().getKey(); @@ -423,28 +426,28 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl // check must-have keys are within prop if (matchMustHaveKeyPatterns(prop)) { JobConfigurationAPIEntity jobConfigurationEntity = new JobConfigurationAPIEntity(); - jobConfigurationEntity.setTags(new HashMap<>(m_baseTags)); - jobConfigurationEntity.getTags().put(MRJobTagName.USER.toString(), m_user); - jobConfigurationEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); - jobConfigurationEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); - jobConfigurationEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); - jobConfigurationEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), m_jobType); - jobConfigurationEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp()); + jobConfigurationEntity.setTags(new HashMap<>(baseTags)); + jobConfigurationEntity.getTags().put(MRJobTagName.USER.toString(), user); + jobConfigurationEntity.getTags().put(MRJobTagName.JOB_ID.toString(), jobId); + jobConfigurationEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), jobName); + jobConfigurationEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), jobDefId); + jobConfigurationEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), jobType); + jobConfigurationEntity.setTimestamp(jobLaunchEventEntity.getTimestamp()); JobConfig jobConfig = new JobConfig(); jobConfig.setConfig(prop); jobConfigurationEntity.setJobConfig(jobConfig); - jobConfigurationEntity.setConfigJobName(m_jobDefId); + jobConfigurationEntity.setConfigJobName(jobDefId); entityCreated(jobConfigurationEntity); } } private boolean matchMustHaveKeyPatterns(Map<String, String> prop) { - if (m_filter.getMustHaveJobConfKeyPatterns() == null) { + if (filter.getMustHaveJobConfKeyPatterns() == null) { return true; } - for (Pattern p : m_filter.getMustHaveJobConfKeyPatterns()) { + for (Pattern p : filter.getMustHaveJobConfKeyPatterns()) { boolean matched = false; for (String key : prop.keySet()) { if (p.matcher(key).matches()) { @@ -460,10 +463,10 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } private boolean included(String key) { - if (m_filter.getJobConfKeyInclusionPatterns() == null) { + if (filter.getJobConfKeyInclusionPatterns() == null) { return true; } - for (Pattern p : m_filter.getJobConfKeyInclusionPatterns()) { + for (Pattern p : filter.getJobConfKeyInclusionPatterns()) { Matcher m = p.matcher(key); if (m.matches()) { LOG.info("include key: " + p.toString()); @@ -474,10 +477,10 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } private boolean excluded(String key) { - if (m_filter.getJobConfKeyExclusionPatterns() == null) { + if (filter.getJobConfKeyExclusionPatterns() == null) { return false; } - for (Pattern p : m_filter.getJobConfKeyExclusionPatterns()) { + for (Pattern p : filter.getJobConfKeyExclusionPatterns()) { Matcher m = p.matcher(key); if (m.matches()) { return true; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java index 6932dad..e20836f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java @@ -37,14 +37,12 @@ import java.util.Map; /** * Listener holds all informations related to one whole job history file, so it's stateful and does not support multithreading. - * - * @author yonzhang */ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer1PerLineListener { private static final Logger logger = LoggerFactory.getLogger(JHFMRVer1EventReader.class); /** - * baseTags stores the basic tag name values which might be used for persisting various entities + * baseTags stores the basic tag name values which might be used for persisting various entities. * baseTags includes: cluster, datacenter and jobName * baseTags are used for all job/task related entities * @@ -93,12 +91,12 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer String[] tmp = decoratedHostname.split("/"); hostname = tmp[tmp.length - 1]; rack = tmp[tmp.length - 2]; - m_host2RackMapping.put(hostname, rack); + host2RackMapping.put(hostname, rack); } else if (values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.KILLED.name()) || values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.FAILED.name())) { hostname = values.get(Keys.HOSTNAME); // make every effort to get RACK information hostname = (hostname == null) ? "" : hostname; - rack = m_host2RackMapping.get(hostname); + rack = host2RackMapping.get(hostname); } values.put(Keys.HOSTNAME, hostname); @@ -149,6 +147,6 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer } public JobExecutionAPIEntity jobExecution() { - return m_jobExecutionEntity; + return jobExecutionEntity; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java index ab59a41..2f63703 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java @@ -43,10 +43,10 @@ public class JHFMRVer1Parser implements JHFParserBase { static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\""); static final String MAX_COUNTER_COUNT = "10000"; - private JHFMRVer1EventReader m_reader; + private JHFMRVer1EventReader reader; public JHFMRVer1Parser(JHFMRVer1EventReader reader) { - this.m_reader = reader; + this.reader = reader; } /** @@ -85,13 +85,14 @@ public class JHFMRVer1Parser implements JHFParserBase { buf.append("\n"); continue; } - parseLine(buf.toString(), m_reader, isEscaped); + parseLine(buf.toString(), this.reader, isEscaped); buf = new StringBuffer(); - } while ((line = reader.readLine()) != null); + } + while ((line = reader.readLine()) != null); // flush to tell listener that we have finished parsing logger.info("finish parsing job history file and close"); - m_reader.close(); + this.reader.close(); } catch (Exception ex) { logger.error("can not parse correctly ", ex); throw ex; @@ -121,10 +122,6 @@ public class JHFMRVer1Parser implements JHFParserBase { parseBuffer.put(Keys.valueOf(parts[0]), value); } -// if(conf!=null){ -// parseBuffer.put(Keys.NORM_JOBNAME, conf.get(JPAConstants.JOB_CONF_NORM_JOBNAME_KEY)); -// } - try { l.handle(RecordTypes.valueOf(recType), parseBuffer); } catch (IllegalArgumentException ex) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java index 4ca0449..f21fd41 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java @@ -33,14 +33,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - @InterfaceAudience.Private @InterfaceStability.Unstable public class JHFMRVer2EventReader extends JHFEventReaderBase { private static final Logger logger = LoggerFactory.getLogger(JHFMRVer2EventReader.class); /** - * Create a new Event Reader + * Create a new Event Reader. * * @throws IOException */ @@ -455,7 +454,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { String[] tmp = rackname.split("/"); String rack = tmp[tmp.length - 1]; values.put(Keys.RACK, rack); - m_host2RackMapping.put(values.get(Keys.HOSTNAME), rack); + host2RackMapping.put(values.get(Keys.HOSTNAME), rack); } private void handleMapAttemptFailed(Event wrapper) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java index 2ccbf8d..f93f942 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java @@ -33,10 +33,10 @@ import java.io.InputStream; public class JHFMRVer2Parser implements JHFParserBase { private static final Logger logger = LoggerFactory.getLogger(JHFMRVer2Parser.class); - private JHFMRVer2EventReader _reader; + private JHFMRVer2EventReader reader; public JHFMRVer2Parser(JHFMRVer2EventReader reader) { - this._reader = reader; + this.reader = reader; } @SuppressWarnings( {"rawtypes", "deprecation"}) @@ -58,11 +58,11 @@ public class JHFMRVer2Parser implements JHFParserBase { Event wrapper; while ((wrapper = getNextEvent(datumReader, decoder)) != null) { ++eventCtr; - _reader.handleEvent(wrapper); + reader.handleEvent(wrapper); } - _reader.parseConfiguration(); + reader.parseConfiguration(); // don't need put to finally as it's a kind of flushing data - _reader.close(); + reader.close(); logger.info("reader used " + (System.currentTimeMillis() - start) + "ms"); } catch (Exception ioe) { logger.error("Caught exception parsing history file after " + eventCtr + " events", ioe); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java index a2479ab..36a0ef7 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java @@ -20,12 +20,9 @@ package org.apache.eagle.jpm.mr.history.parser; import java.io.InputStream; -/** - * @author yonzhang - */ public interface JHFParserBase { /** - * this method will ensure to close the inputstream + * this method will ensure to close the inputStream. * * @param is * @throws Exception http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java index d5ed97a..64b9fe0 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java @@ -19,15 +19,11 @@ package org.apache.eagle.jpm.mr.history.parser; /** - * used to warn that one job history file has not yet completed writing to hdfs + * used to warn that one job history file has not yet completed writing to hdfs. * This happens when feeder catches up and the history file has not been written into hdfs completely * - * @author yonzhang */ public class JHFWriteNotCompletedException extends Exception { - /** - * - */ private static final long serialVersionUID = -3060175780718218490L; public JHFWriteNotCompletedException(String msg) {