http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index 52bd8ea..278599d 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -18,62 +18,63 @@
 
 package org.apache.eagle.jpm.mr.history.crawler;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
-import org.apache.eagle.jpm.util.JobIdFilter;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateLCM;
+import org.apache.eagle.jpm.util.JobIdFilter;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 /**
- * single thread crawling per driver
- * multiple drivers can achieve parallelism
+ * single thread crawling per driver.
+ * multiple drivers can achieve parallelism.
  *
  */
 public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
     private static final Logger LOG = 
LoggerFactory.getLogger(JHFCrawlerDriverImpl.class);
 
-    private final static int SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY = 10;
-    private final static String FORMAT_JOB_PROCESS_DATE = "%4d%02d%02d";
-    private final static Pattern PATTERN_JOB_PROCESS_DATE = 
Pattern.compile("([0-9]{4})([0-9]{2})([0-9]{2})");
+    private static final int SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY = 10;
+    private static final String FORMAT_JOB_PROCESS_DATE = "%4d%02d%02d";
+    private static final Pattern PATTERN_JOB_PROCESS_DATE = 
Pattern.compile("([0-9]{4})([0-9]{2})([0-9]{2})");
 
     private static final int INITIALIZED = 0x0;
     private static final int TODAY = 0x1;
     private static final int BEFORETODAY = 0x10;
-    private final int PROCESSED_JOB_KEEP_DAYS = 5;
+    private static final int PROCESSED_JOB_KEEP_DAYS = 5;
 
-    private int m_flag = INITIALIZED; // 0 not set, 1 TODAY, 2 BEFORETODAY
-    private Deque<Pair<Long, String> > m_processQueue = new LinkedList<>();
-    private Set<String> m_processedJobFileNames = new HashSet<>();
+    private int flag = INITIALIZED; // 0 not set, 1 TODAY, 2 BEFORETODAY
+    private Deque<Pair<Long, String>> processQueue = new LinkedList<>();
+    private Set<String> processedJobFileNames = new HashSet<>();
 
-    private final JobProcessDate m_proceeDate = new JobProcessDate();
-    private boolean m_dryRun;
-    private JHFInputStreamCallback m_reader;
-    protected boolean m_zeroBasedMonth = true;
+    private final JobProcessDate processDate = new JobProcessDate();
+    private boolean dryRun;
+    private JHFInputStreamCallback reader;
+    protected boolean zeroBasedMonth = true;
 
-    private JobHistoryZKStateLCM m_zkStatelcm;
-    private JobHistoryLCM m_jhfLCM;
-    private JobIdFilter m_jobFilter;
-    private int m_partitionId;
-    private TimeZone m_timeZone;
+    private JobHistoryZKStateLCM zkStateLcm;
+    private JobHistoryLCM jhfLCM;
+    private JobIdFilter jobFilter;
+    private int partitionId;
+    private TimeZone timeZone;
 
     public JHFCrawlerDriverImpl(MRHistoryJobConfig.JobHistoryEndpointConfig 
jobHistoryConfig,
                                 MRHistoryJobConfig.ControlConfig 
controlConfig, JHFInputStreamCallback reader,
                                 JobHistoryZKStateLCM zkStateLCM,
                                 JobHistoryLCM historyLCM, JobIdFilter 
jobFilter, int partitionId) throws Exception {
-        this.m_zeroBasedMonth = controlConfig.zeroBasedMonth;
-        this.m_dryRun = controlConfig.dryRun;
-        if (this.m_dryRun)  LOG.info("this is a dry run");
-        this.m_reader = reader;
-        m_jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig);
-        this.m_zkStatelcm = zkStateLCM;
-        this.m_partitionId = partitionId;
-        this.m_jobFilter = jobFilter;
-        m_timeZone = TimeZone.getTimeZone(controlConfig.timeZone);
+        this.zeroBasedMonth = controlConfig.zeroBasedMonth;
+        this.dryRun = controlConfig.dryRun;
+        if (this.dryRun)  {
+            LOG.info("this is a dry run");
+        }
+        this.reader = reader;
+        jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig);
+        this.zkStateLcm = zkStateLCM;
+        this.partitionId = partitionId;
+        this.jobFilter = jobFilter;
+        timeZone = TimeZone.getTimeZone(controlConfig.timeZone);
     }
 
     /**
@@ -90,70 +91,70 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
      */
     @Override
     public long crawl() throws Exception {
-        LOG.info("queue size is " + m_processQueue.size());
-        while (m_processQueue.isEmpty()) {
+        LOG.info("queue size is " + processQueue.size());
+        while (processQueue.isEmpty()) {
             // read lastProcessedDate only when it's initialized
-            if (m_flag == INITIALIZED) {
+            if (flag == INITIALIZED) {
                 readAndCacheLastProcessedDate();
             }
-            if (m_flag == BEFORETODAY) {
+            if (flag == BEFORETODAY) {
                 updateProcessDate();
                 clearProcessedJobFileNames();
             }
-            if (m_flag != TODAY) { // advance one day if initialized or BEFORE 
today
+            if (flag != TODAY) { // advance one day if initialized or BEFORE 
today
                 advanceOneDay();
             }
 
             if (isToday()) {
-                m_flag = TODAY;
+                flag = TODAY;
             } else {
-                m_flag = BEFORETODAY;
+                flag = BEFORETODAY;
             }
 
-            List<String> serialNumbers = 
m_jhfLCM.readSerialNumbers(this.m_proceeDate.year, 
getActualMonth(m_proceeDate.month), this.m_proceeDate.day);
-            List<Pair<Long, String> > allJobHistoryFiles = new LinkedList<>();
+            List<String> serialNumbers = 
jhfLCM.readSerialNumbers(this.processDate.year, 
getActualMonth(processDate.month), this.processDate.day);
+            List<Pair<Long, String>> allJobHistoryFiles = new LinkedList<>();
             for (String serialNumber : serialNumbers) {
-                List<Pair<Long, String> > jobHistoryFiles = 
m_jhfLCM.readFileNames(
-                        this.m_proceeDate.year,
-                        getActualMonth(m_proceeDate.month),
-                        this.m_proceeDate.day,
+                List<Pair<Long, String>> jobHistoryFiles = 
jhfLCM.readFileNames(
+                        this.processDate.year,
+                        getActualMonth(processDate.month),
+                        this.processDate.day,
                         Integer.parseInt(serialNumber));
                 LOG.info("total number of job history files " + 
jobHistoryFiles.size());
                 for (Pair<Long, String> jobHistoryFile : jobHistoryFiles) {
-                    if (m_jobFilter.accept(jobHistoryFile.getRight()) && 
!fileProcessed(jobHistoryFile.getRight())) {
+                    if (jobFilter.accept(jobHistoryFile.getRight()) && 
!fileProcessed(jobHistoryFile.getRight())) {
                         allJobHistoryFiles.add(jobHistoryFile);
                     }
                 }
                 jobHistoryFiles.clear();
-                LOG.info("after filtering, number of job history files " + 
m_processQueue.size());
+                LOG.info("after filtering, number of job history files " + 
processQueue.size());
             }
 
             Collections.sort(allJobHistoryFiles,
-                    new Comparator<Pair<Long, String>>() {
-                        @Override
-                        public int compare(Pair<Long, String> o1, Pair<Long, 
String> o2) {
-                            if (o1.getLeft() > o2.getLeft()) return 1;
-                            else if (o1.getLeft() == o2.getLeft()) return 0;
-                            else return -1;
-                        }
+                (o1, o2) -> {
+                    if (o1.getLeft() > o2.getLeft()) {
+                        return 1;
+                    } else if (o1.getLeft() == o2.getLeft()) {
+                        return 0;
+                    } else {
+                        return -1;
                     }
+                }
             );
             for (Pair<Long, String> jobHistoryFile : allJobHistoryFiles) {
-                m_processQueue.add(jobHistoryFile);
+                processQueue.add(jobHistoryFile);
             }
 
             allJobHistoryFiles.clear();
 
-            if (m_processQueue.isEmpty()) {
+            if (processQueue.isEmpty()) {
                 Thread.sleep(SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY * 1000);
             } else {
-                LOG.info("queue size after populating is now : " + 
m_processQueue.size());
+                LOG.info("queue size after populating is now : " + 
processQueue.size());
             }
         }
         // start to process job history file
-        Pair<Long, String> item = m_processQueue.pollFirst();
+        Pair<Long, String> item = processQueue.pollFirst();
         String jobHistoryFile = item.getRight();
-        Long modifiedTime = item.getLeft();
         if (jobHistoryFile == null) { // terminate this round of crawling when 
the queue is empty
             LOG.info("process queue is empty, ignore this round");
             return -1;
@@ -168,33 +169,34 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
             LOG.warn("illegal job history file name : " + jobHistoryFile);
             return -1;
         }
-        if (!m_dryRun) {
-            m_jhfLCM.readFileContent(
-                    m_proceeDate.year,
-                    getActualMonth(m_proceeDate.month),
-                    m_proceeDate.day,
+        if (!dryRun) {
+            jhfLCM.readFileContent(
+                    processDate.year,
+                    getActualMonth(processDate.month),
+                    processDate.day,
                     Integer.valueOf(serialNumber),
                     jobHistoryFile,
-                    m_reader);
+                reader);
         }
-        m_zkStatelcm.addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE,
-                this.m_proceeDate.year,
-                this.m_proceeDate.month + 1,
-                this.m_proceeDate.day),
+        zkStateLcm.addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE,
+                this.processDate.year,
+                this.processDate.month + 1,
+                this.processDate.day),
                 jobHistoryFile);
-        m_processedJobFileNames.add(jobHistoryFile);
+        processedJobFileNames.add(jobHistoryFile);
 
+        Long modifiedTime = item.getLeft();
         return modifiedTime;
     }
 
     private void updateProcessDate() throws Exception {
-        String line = String.format(FORMAT_JOB_PROCESS_DATE, 
this.m_proceeDate.year,
-                this.m_proceeDate.month + 1, this.m_proceeDate.day);
-        m_zkStatelcm.updateProcessedDate(m_partitionId, line);
+        String line = String.format(FORMAT_JOB_PROCESS_DATE, 
this.processDate.year,
+                this.processDate.month + 1, this.processDate.day);
+        zkStateLcm.updateProcessedDate(partitionId, line);
     }
 
-    private int getActualMonth(int month){
-        return m_zeroBasedMonth ? m_proceeDate.month : m_proceeDate.month + 1;
+    private int getActualMonth(int month) {
+        return zeroBasedMonth ? processDate.month : processDate.month + 1;
     }
 
     private static class JobProcessDate {
@@ -204,37 +206,37 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
     }
 
     private void clearProcessedJobFileNames() {
-        m_processedJobFileNames.clear();
+        processedJobFileNames.clear();
     }
 
     private void readAndCacheLastProcessedDate() throws Exception {
-        String lastProcessedDate = 
m_zkStatelcm.readProcessedDate(m_partitionId);
+        String lastProcessedDate = zkStateLcm.readProcessedDate(partitionId);
         Matcher m = PATTERN_JOB_PROCESS_DATE.matcher(lastProcessedDate);
         if (m.find() && m.groupCount() == 3) {
-            this.m_proceeDate.year = Integer.parseInt(m.group(1));
-            this.m_proceeDate.month = Integer.parseInt(m.group(2)) - 1; // 
zero based month
-            this.m_proceeDate.day = Integer.parseInt(m.group(3));
+            this.processDate.year = Integer.parseInt(m.group(1));
+            this.processDate.month = Integer.parseInt(m.group(2)) - 1; // zero 
based month
+            this.processDate.day = Integer.parseInt(m.group(3));
         } else {
             throw new IllegalStateException("job lastProcessedDate must have 
format YYYYMMDD " + lastProcessedDate);
         }
 
-        GregorianCalendar cal = new GregorianCalendar(m_timeZone);
-        cal.set(this.m_proceeDate.year, this.m_proceeDate.month, 
this.m_proceeDate.day, 0, 0, 0);
+        GregorianCalendar cal = new GregorianCalendar(timeZone);
+        cal.set(this.processDate.year, this.processDate.month, 
this.processDate.day, 0, 0, 0);
         cal.add(Calendar.DATE, 1);
-        List<String> list = 
m_zkStatelcm.readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE, 
cal.get(Calendar.YEAR),
+        List<String> list = 
zkStateLcm.readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE, 
cal.get(Calendar.YEAR),
                 cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH)));
         if (list != null) {
-            this.m_processedJobFileNames = new HashSet<>(list);
+            this.processedJobFileNames = new HashSet<>(list);
         }
     }
 
     private void advanceOneDay() throws Exception {
-        GregorianCalendar cal = new GregorianCalendar(m_timeZone);
-        cal.set(this.m_proceeDate.year, this.m_proceeDate.month, 
this.m_proceeDate.day, 0, 0, 0);
+        GregorianCalendar cal = new GregorianCalendar(timeZone);
+        cal.set(this.processDate.year, this.processDate.month, 
this.processDate.day, 0, 0, 0);
         cal.add(Calendar.DATE, 1);
-        this.m_proceeDate.year = cal.get(Calendar.YEAR);
-        this.m_proceeDate.month = cal.get(Calendar.MONTH);
-        this.m_proceeDate.day = cal.get(Calendar.DAY_OF_MONTH);
+        this.processDate.year = cal.get(Calendar.YEAR);
+        this.processDate.month = cal.get(Calendar.MONTH);
+        this.processDate.day = cal.get(Calendar.DAY_OF_MONTH);
 
         try {
             clearProcessedJob(cal);
@@ -249,29 +251,30 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
         cal.add(Calendar.DATE, -1 - PROCESSED_JOB_KEEP_DAYS);
         String line = String.format(FORMAT_JOB_PROCESS_DATE, 
cal.get(Calendar.YEAR),
                 cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH));
-        m_zkStatelcm.truncateProcessedJob(line);
+        zkStateLcm.truncateProcessedJob(line);
     }
 
     private boolean isToday() {
-        GregorianCalendar today = new GregorianCalendar(m_timeZone);
+        GregorianCalendar today = new GregorianCalendar(timeZone);
 
-        if (today.get(Calendar.YEAR) == this.m_proceeDate.year
-                && today.get(Calendar.MONTH) == this.m_proceeDate.month
-                && today.get(Calendar.DAY_OF_MONTH) == this.m_proceeDate.day)
+        if (today.get(Calendar.YEAR) == this.processDate.year
+                && today.get(Calendar.MONTH) == this.processDate.month
+                && today.get(Calendar.DAY_OF_MONTH) == this.processDate.day) {
             return true;
-
+        }
         return false;
     }
 
     /**
-     * check if this file was already processed
+     * check if this file was already processed.
      *
      * @param fileName
      * @return
      */
     private boolean fileProcessed(String fileName) {
-        if (m_processedJobFileNames.contains(fileName))
+        if (processedJobFileNames.contains(fileName)) {
             return true;
+        }
         return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java
index 52a62a4..3e5ce1f 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java
@@ -24,11 +24,11 @@ import java.io.InputStream;
 import java.io.Serializable;
 
 /**
- * callback when job history file input stream is ready
+ * callback when job history file input stream is ready.
  */
 public interface JHFInputStreamCallback extends Serializable {
     /**
-     * this is called when job file string and job configuration file is ready
+     * this is called when job file string and job configuration file is ready.
      * @param is
      * @param configuration
      * @throws Exception

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
index 66dbce1..2c866f6 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
@@ -24,9 +24,14 @@ import java.util.regex.Pattern;
 
 public interface JobHistoryContentFilter extends Serializable {
     boolean acceptJobFile();
+
     boolean acceptJobConfFile();
+
     List<Pattern> getMustHaveJobConfKeyPatterns();
+
     List<Pattern> getJobConfKeyInclusionPatterns();
+
     List<Pattern> getJobConfKeyExclusionPatterns();
+    
     String getJobNameKey();
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
index 65b8dab..c43c366 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
@@ -27,56 +27,57 @@ import java.util.List;
 import java.util.regex.Pattern;
 
 public class JobHistoryContentFilterBuilder {
-    private final static Logger LOG = 
LoggerFactory.getLogger(JobHistoryContentFilterBuilder.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobHistoryContentFilterBuilder.class);
 
-    private boolean m_acceptJobFile;
-    private boolean m_acceptJobConfFile;
-    private List<Pattern> m_mustHaveJobConfKeyPatterns;
-    private List<Pattern> m_jobConfKeyInclusionPatterns;
-    private List<Pattern> m_jobConfKeyExclusionPatterns;
+    private boolean acceptJobFile;
+    private boolean acceptJobConfFile;
+    private List<Pattern> mustHaveJobConfKeyPatterns;
+    private List<Pattern> jobConfKeyInclusionPatterns;
+    private List<Pattern> jobConfKeyExclusionPatterns;
 
     private String jobNameKey;
 
-    public static JobHistoryContentFilterBuilder newBuilder(){
+    public static JobHistoryContentFilterBuilder newBuilder() {
         return new JobHistoryContentFilterBuilder();
     }
 
     public JobHistoryContentFilterBuilder acceptJobFile() {
-        this.m_acceptJobFile = true;
+        this.acceptJobFile = true;
         return this;
     }
 
     public JobHistoryContentFilterBuilder acceptJobConfFile() {
-        this.m_acceptJobConfFile = true;
+        this.acceptJobConfFile = true;
         return this;
     }
 
     public JobHistoryContentFilterBuilder mustHaveJobConfKeyPatterns(Pattern 
...patterns) {
-        m_mustHaveJobConfKeyPatterns = Arrays.asList(patterns);
-        if (m_jobConfKeyInclusionPatterns != null) {
+        mustHaveJobConfKeyPatterns = Arrays.asList(patterns);
+        if (jobConfKeyInclusionPatterns != null) {
             List<Pattern> list = new ArrayList<Pattern>();
-            list.addAll(m_jobConfKeyInclusionPatterns);
+            list.addAll(jobConfKeyInclusionPatterns);
             list.addAll(Arrays.asList(patterns));
-            m_jobConfKeyInclusionPatterns = list;
+            jobConfKeyInclusionPatterns = list;
+        } else {
+            jobConfKeyInclusionPatterns = Arrays.asList(patterns);
         }
-        else
-            m_jobConfKeyInclusionPatterns = Arrays.asList(patterns);
         return this;
     }
 
     public JobHistoryContentFilterBuilder includeJobKeyPatterns(Pattern ... 
patterns) {
-        if (m_jobConfKeyInclusionPatterns != null) {
+        if (jobConfKeyInclusionPatterns != null) {
             List<Pattern> list = new ArrayList<Pattern>();
-            list.addAll(m_jobConfKeyInclusionPatterns);
+            list.addAll(jobConfKeyInclusionPatterns);
             list.addAll(Arrays.asList(patterns));
-            m_jobConfKeyInclusionPatterns = list;
-        } else
-            m_jobConfKeyInclusionPatterns = Arrays.asList(patterns);
+            jobConfKeyInclusionPatterns = list;
+        } else {
+            jobConfKeyInclusionPatterns = Arrays.asList(patterns);
+        }
         return this;
     }
 
     public JobHistoryContentFilterBuilder excludeJobKeyPatterns(Pattern 
...patterns) {
-        m_jobConfKeyExclusionPatterns = Arrays.asList(patterns);
+        jobConfKeyExclusionPatterns = Arrays.asList(patterns);
         return this;
     }
 
@@ -87,11 +88,11 @@ public class JobHistoryContentFilterBuilder {
 
     public JobHistoryContentFilter build() {
         JobHistoryContentFilterImpl filter = new JobHistoryContentFilterImpl();
-        filter.setAcceptJobFile(m_acceptJobFile);
-        filter.setAcceptJobConfFile(m_acceptJobConfFile);
-        filter.setMustHaveJobConfKeyPatterns(m_mustHaveJobConfKeyPatterns);
-        filter.setJobConfKeyInclusionPatterns(m_jobConfKeyInclusionPatterns);
-        filter.setJobConfKeyExclusionPatterns(m_jobConfKeyExclusionPatterns);
+        filter.setAcceptJobFile(acceptJobFile);
+        filter.setAcceptJobConfFile(acceptJobConfFile);
+        filter.setMustHaveJobConfKeyPatterns(mustHaveJobConfKeyPatterns);
+        filter.setJobConfKeyInclusionPatterns(jobConfKeyInclusionPatterns);
+        filter.setJobConfKeyExclusionPatterns(jobConfKeyExclusionPatterns);
         filter.setJobNameKey(jobNameKey);
         LOG.info("job history content filter:" + filter);
         return filter;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
index 5e7a856..5340372 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
@@ -22,37 +22,37 @@ import java.util.List;
 import java.util.regex.Pattern;
 
 public class JobHistoryContentFilterImpl implements JobHistoryContentFilter {
-    private boolean m_acceptJobFile;
-    private boolean m_acceptJobConfFile;
-    private List<Pattern> m_mustHaveJobConfKeyPatterns;
-    private List<Pattern> m_jobConfKeyInclusionPatterns;
-    private List<Pattern> m_jobConfKeyExclusionPatterns;
+    private boolean acceptJobFile;
+    private boolean acceptJobConfFile;
+    private List<Pattern> mustHaveJobConfKeyPatterns;
+    private List<Pattern> jobConfKeyInclusionPatterns;
+    private List<Pattern> jobConfKeyExclusionPatterns;
 
     private String jobNameKey;
 
     @Override
     public boolean acceptJobFile() {
-        return m_acceptJobFile;
+        return acceptJobFile;
     }
 
     @Override
     public boolean acceptJobConfFile() {
-        return m_acceptJobConfFile;
+        return acceptJobConfFile;
     }
 
     @Override
     public List<Pattern> getMustHaveJobConfKeyPatterns() {
-        return m_mustHaveJobConfKeyPatterns;
+        return mustHaveJobConfKeyPatterns;
     }
 
     @Override
     public List<Pattern> getJobConfKeyInclusionPatterns() {
-        return m_jobConfKeyInclusionPatterns;
+        return jobConfKeyInclusionPatterns;
     }
 
     @Override
     public List<Pattern> getJobConfKeyExclusionPatterns() {
-        return m_jobConfKeyExclusionPatterns;
+        return jobConfKeyExclusionPatterns;
     }
 
     @Override
@@ -65,40 +65,40 @@ public class JobHistoryContentFilterImpl implements 
JobHistoryContentFilter {
     }
 
     public void setAcceptJobFile(boolean acceptJobFile) {
-        this.m_acceptJobFile = acceptJobFile;
+        this.acceptJobFile = acceptJobFile;
     }
 
     public void setAcceptJobConfFile(boolean acceptJobConfFile) {
-        this.m_acceptJobConfFile = acceptJobConfFile;
+        this.acceptJobConfFile = acceptJobConfFile;
     }
 
     public void setJobConfKeyInclusionPatterns(
             List<Pattern> jobConfKeyInclusionPatterns) {
-        this.m_jobConfKeyInclusionPatterns = jobConfKeyInclusionPatterns;
+        this.jobConfKeyInclusionPatterns = jobConfKeyInclusionPatterns;
     }
 
     public void setJobConfKeyExclusionPatterns(
             List<Pattern> jobConfKeyExclusionPatterns) {
-        this.m_jobConfKeyExclusionPatterns = jobConfKeyExclusionPatterns;
+        this.jobConfKeyExclusionPatterns = jobConfKeyExclusionPatterns;
     }
 
     public void setMustHaveJobConfKeyPatterns(List<Pattern> 
mustHaveJobConfKeyPatterns) {
-        this.m_mustHaveJobConfKeyPatterns = mustHaveJobConfKeyPatterns;
+        this.mustHaveJobConfKeyPatterns = mustHaveJobConfKeyPatterns;
     }
 
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("job history file:");
-        sb.append(m_acceptJobFile);
+        sb.append(acceptJobFile);
         sb.append(", job config file:");
-        sb.append(m_acceptJobConfFile);
-        if(m_acceptJobConfFile){
+        sb.append(acceptJobConfFile);
+        if (acceptJobConfFile) {
             sb.append(", must contain keys:");
-            sb.append(m_mustHaveJobConfKeyPatterns);
+            sb.append(mustHaveJobConfKeyPatterns);
             sb.append(", include keys:");
-            sb.append(m_jobConfKeyInclusionPatterns);
+            sb.append(jobConfKeyInclusionPatterns);
             sb.append(", exclude keys:");
-            sb.append(m_jobConfKeyExclusionPatterns);
+            sb.append(jobConfKeyExclusionPatterns);
         }
         return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
index cfd5994..0441f1f 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
@@ -18,6 +18,8 @@
 
 package org.apache.eagle.jpm.mr.history.crawler;
 
+import 
org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
+import org.apache.eagle.jpm.util.HDFSUtil;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -32,36 +34,32 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
-import org.apache.eagle.jpm.util.HDFSUtil;
-import 
org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
 
 public class JobHistoryDAOImpl extends AbstractJobHistoryDAO {
     private static final Logger LOG = 
LoggerFactory.getLogger(JobHistoryDAOImpl.class);
 
-    private Configuration m_conf = new Configuration();
+    private Configuration conf = new Configuration();
 
-    private FileSystem m_hdfs;
+    private FileSystem hdfs;
 
     public JobHistoryDAOImpl(JobHistoryEndpointConfig endpointConfig) throws 
Exception {
         super(endpointConfig.basePath, 
endpointConfig.pathContainsJobTrackerName, endpointConfig.jobTrackerName);
-        this.m_conf.set("fs.defaultFS", endpointConfig.nnEndpoint);
-        this.m_conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+        this.conf.set("fs.defaultFS", endpointConfig.nnEndpoint);
+        this.conf.setBoolean("fs.hdfs.impl.disable.cache", true);
         if (!endpointConfig.principal.equals("")) {
-            this.m_conf.set("hdfs.kerberos.principal", 
endpointConfig.principal);
-            this.m_conf.set("hdfs.keytab.file", endpointConfig.keyTab);
+            this.conf.set("hdfs.kerberos.principal", endpointConfig.principal);
+            this.conf.set("hdfs.keytab.file", endpointConfig.keyTab);
         }
         LOG.info("file system:" + endpointConfig.nnEndpoint);
-        m_hdfs = HDFSUtil.getFileSystem(m_conf);
+        hdfs = HDFSUtil.getFileSystem(conf);
     }
 
     @Override
     public void freshFileSystem() throws Exception {
         try {
-            m_hdfs.close();
-        } catch (Exception e) {
-
+            hdfs.close();
         } finally {
-            m_hdfs = HDFSUtil.getFileSystem(m_conf);
+            hdfs = HDFSUtil.getFileSystem(conf);
         }
     }
 
@@ -70,20 +68,17 @@ public class JobHistoryDAOImpl extends 
AbstractJobHistoryDAO {
         String latestJobTrackerName = null;
         try {
             Path hdfsFile = new Path(basePath);
-            FileStatus[] files = m_hdfs.listStatus(hdfsFile);
+            FileStatus[] files = hdfs.listStatus(hdfsFile);
 
             // Sort by modification time as order of desc
-            Arrays.sort(files, new Comparator<FileStatus>() {
-                @Override
-                public int compare(FileStatus o1, FileStatus o2) {
-                    long comp = 
parseJobTrackerNameTimestamp(o1.getPath().toString()) - 
parseJobTrackerNameTimestamp(o2.getPath().toString());
-                    if (comp > 0l) {
-                        return -1;
-                    } else if (comp < 0l) {
-                        return 1;
-                    }
-                    return 0;
+            Arrays.sort(files, (o1, o2) -> {
+                long comp = 
parseJobTrackerNameTimestamp(o1.getPath().toString()) - 
parseJobTrackerNameTimestamp(o2.getPath().toString());
+                if (comp > 0L) {
+                    return -1;
+                } else if (comp < 0L) {
+                    return 1;
                 }
+                return 0;
             });
 
             for (FileStatus fs : files) {
@@ -94,7 +89,7 @@ public class JobHistoryDAOImpl extends AbstractJobHistoryDAO {
                     break;
                 }
             }
-        } catch(Exception ex) {
+        } catch (Exception ex) {
             LOG.error("fail read job tracker name " + basePath, ex);
             throw ex;
         }
@@ -108,7 +103,7 @@ public class JobHistoryDAOImpl extends 
AbstractJobHistoryDAO {
         LOG.info("crawl serial numbers under one day : " + dailyPath);
         try {
             Path hdfsFile = new Path(dailyPath);
-            FileStatus[] files = m_hdfs.listStatus(hdfsFile);
+            FileStatus[] files = hdfs.listStatus(hdfsFile);
             for (FileStatus fs : files) {
                 if (fs.isDir()) {
                     serialNumbers.add(fs.getPath().getName());
@@ -125,7 +120,8 @@ public class JobHistoryDAOImpl extends 
AbstractJobHistoryDAO {
         }
         StringBuilder sb = new StringBuilder();
         for (String sn : serialNumbers) {
-            sb.append(sn);sb.append(",");
+            sb.append(sn);
+            sb.append(",");
         }
         LOG.info("crawled serialNumbers: " + sb);
         return serialNumbers;
@@ -133,18 +129,19 @@ public class JobHistoryDAOImpl extends 
AbstractJobHistoryDAO {
 
     @SuppressWarnings("deprecation")
     @Override
-    public List<Pair<Long, String> > readFileNames(int year, int month, int 
day, int serialNumber) throws Exception {
+    public List<Pair<Long, String>> readFileNames(int year, int month, int 
day, int serialNumber) throws Exception {
         LOG.info("crawl file names under one serial number : " + year + "/" + 
month + "/" + day + ":" + serialNumber);
-        List<Pair<Long, String> > jobFileNames = new ArrayList<>();
+        List<Pair<Long, String>> jobFileNames = new ArrayList<>();
         String serialPath = buildWholePathToSerialNumber(year, month, day, 
serialNumber);
         try {
             Path hdfsFile = new Path(serialPath);
             // filter those files which is job configuration file in xml format
-            FileStatus[] files = m_hdfs.listStatus(hdfsFile, new PathFilter(){
+            FileStatus[] files = hdfs.listStatus(hdfsFile, new PathFilter() {
                 @Override
-                public boolean accept(Path path){
-                    if (path.getName().endsWith(".xml"))
+                public boolean accept(Path path) {
+                    if (path.getName().endsWith(".xml")) {
                         return false;
+                    }
                     return true;
                 }
             });
@@ -156,7 +153,8 @@ public class JobHistoryDAOImpl extends 
AbstractJobHistoryDAO {
             if (LOG.isDebugEnabled()) {
                 StringBuilder sb = new StringBuilder();
                 for (Pair<Long, String> sn : jobFileNames) {
-                    sb.append(sn.getRight());sb.append(",");
+                    sb.append(sn.getRight());
+                    sb.append(",");
                 }
                 LOG.debug("crawled: " + sb);
             }
@@ -168,7 +166,7 @@ public class JobHistoryDAOImpl extends 
AbstractJobHistoryDAO {
     }
 
     /**
-     * it's the responsibility of caller to close input stream
+     * it's the responsibility of caller to close input stream.
      */
     @Override
     public InputStream getJHFFileContentAsStream(int year, int month, int day, 
int serialNumber, String jobHistoryFileName) throws Exception {
@@ -176,26 +174,28 @@ public class JobHistoryDAOImpl extends 
AbstractJobHistoryDAO {
         LOG.info("Read job history file: " + path);
         try {
             Path hdfsFile = new Path(path);
-            return m_hdfs.open(hdfsFile);
-        } catch(Exception ex) {
+            return hdfs.open(hdfsFile);
+        } catch (Exception ex) {
             LOG.error("fail getting hdfs file inputstream " + path, ex);
             throw ex;
         }
     }
 
     /**
-     * it's the responsibility of caller to close input stream
+     * it's the responsibility of caller to close input stream.
      */
     @Override
     public InputStream getJHFConfContentAsStream(int year, int month, int day, 
int serialNumber, String jobHistoryFileName) throws Exception {
         String path = buildWholePathToJobConfFile(year, month, day, 
serialNumber,jobHistoryFileName);
-        if (path  == null) return null;
+        if (path  == null) {
+            return null;
+        }
 
         LOG.info("Read job conf file: " + path);
         try {
             Path hdfsFile = new Path(path);
-            return m_hdfs.open(hdfsFile);
-        } catch(Exception ex) {
+            return hdfs.open(hdfsFile);
+        } catch (Exception ex) {
             LOG.error("fail getting job configuration input stream from " + 
path, ex);
             throw ex;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java
index de8d3f7..f428b11 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java
@@ -25,12 +25,12 @@ import java.util.List;
 
 /**
  * Define various operations on job history file resource for lifecycle 
management
- *
+ * <p></p>
  * The job history file directory structure supported is as follows:
- * 
<basePath>/<jobTrackerName>/<year>/<month>/<day>/<serialNumber>/<jobHistoryFileName>
- *
- * In some hadoop version, <jobTrackerName> is not included
- *
+ * basePath/jobTrackerName/year/month/day/serialNumber/jobHistoryFileName
+ * <p></p>
+ * In some hadoop version, jobTrackerName is not included
+ * <p></p>
  * The operations involved in resource read
  * - list job tracker names under basePath (mostly basePath is configured in 
entry mapreduce.jobhistory.done-dir of mapred-site.xml)
  * - list serial numbers under one day
@@ -40,7 +40,9 @@ import java.util.List;
  */
 public interface JobHistoryLCM {
     String calculateJobTrackerName(String basePath) throws Exception;
+
     /**
+     * ...
      * @param year
      * @param month 0-based or 1-based month depending on hadoop cluster 
setting
      * @param day
@@ -48,7 +50,9 @@ public interface JobHistoryLCM {
      * @throws Exception
      */
     List<String> readSerialNumbers(int year, int month, int day) throws 
Exception;
+
     /**
+     * ...
      * @param year
      * @param month 0-based or 1-based month depending on hadoop cluster 
setting
      * @param day
@@ -56,8 +60,10 @@ public interface JobHistoryLCM {
      * @return
      * @throws Exception
      */
-    List<Pair<Long, String> > readFileNames(int year, int month, int day, int 
serialNumber) throws Exception;
+    List<Pair<Long, String>> readFileNames(int year, int month, int day, int 
serialNumber) throws Exception;
+
     /**
+     * ...
      * @param year
      * @param month 0-based or 1-based month depending on hadoop cluster 
setting
      * @param day
@@ -67,7 +73,9 @@ public interface JobHistoryLCM {
      * @throws Exception
      */
     void readFileContent(int year, int month, int day, int serialNumber, 
String jobHistoryFileName, JHFInputStreamCallback reader) throws Exception;
+
     /**
+     * ...
      * @param year
      * @param month 0-based or 1-based month depending on hadoop cluster 
setting
      * @param day
@@ -77,10 +85,8 @@ public interface JobHistoryLCM {
      * @throws Exception
      */
     InputStream getJHFFileContentAsStream(int year, int month, int day, int 
serialNumber, String jobHistoryFileName) throws Exception;
+
     InputStream getJHFConfContentAsStream(int year, int month, int day, int 
serialNumber, String jobConfFileName) throws Exception;
 
-    /**
-     *
-     */
     void freshFileSystem() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
index 211dd9d..a7dc9a8 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
@@ -18,18 +18,18 @@
 
 package org.apache.eagle.jpm.mr.history.crawler;
 
-import backtype.storm.spout.SpoutOutputCollector;
 import org.apache.eagle.dataproc.impl.storm.ValuesArray;
+import backtype.storm.spout.SpoutOutputCollector;
 
 public class JobHistorySpoutCollectorInterceptor implements 
EagleOutputCollector {
-    private SpoutOutputCollector m_collector;
+    private SpoutOutputCollector collector;
 
     public void setSpoutOutputCollector(SpoutOutputCollector collector) {
-        this.m_collector = collector;
+        this.collector = collector;
     }
 
     @Override
     public void collect(ValuesArray t) {
-        m_collector.emit(t);
+        collector.emit(t);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
index 892c2ea..dec3f82 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
@@ -23,19 +23,17 @@ import 
org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 
 /**
  * generalizing this listener would decouple entity creation and entity 
handling, also will help unit testing.
- *
- * @author yonzhang
  */
 public interface HistoryJobEntityCreationListener {
     /**
-     * job entity created event
+     * job entity created event.
      *
      * @param entity
      */
     void jobEntityCreated(JobBaseAPIEntity entity) throws Exception;
 
     /**
-     * for streaming processing, flush would help commit the last several 
entities
+     * for streaming processing, flush would help commit the last several 
entities.
      */
     void flush() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 82e305a..5d3d5b4 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -41,30 +41,30 @@ import java.util.regex.Pattern;
  */
 public abstract class JHFEventReaderBase extends JobEntityCreationPublisher 
implements Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(JHFEventReaderBase.class);
-    protected Map<String, String> m_baseTags;
-    protected JobEventAPIEntity m_jobSubmitEventEntity;
-    protected JobEventAPIEntity m_jobLaunchEventEntity;
-    protected int m_numTotalMaps;
-    protected int m_numTotalReduces;
-    protected JobEventAPIEntity m_jobFinishEventEntity;
-    protected JobExecutionAPIEntity m_jobExecutionEntity;
-    protected Map<String, Long> m_taskStartTime;
+    protected Map<String, String> baseTags;
+    protected JobEventAPIEntity jobSubmitEventEntity;
+    protected JobEventAPIEntity jobLaunchEventEntity;
+    protected int numTotalMaps;
+    protected int numTotalReduces;
+    protected JobEventAPIEntity jobFinishEventEntity;
+    protected JobExecutionAPIEntity jobExecutionEntity;
+    protected Map<String, Long> taskStartTime;
     // taskAttemptID to task attempt startTime
-    protected Map<String, Long> m_taskAttemptStartTime;
+    protected Map<String, Long> taskAttemptStartTime;
 
     // taskID to host mapping, for task it's the host where the last attempt 
runs on
-    protected Map<String, String> m_taskRunningHosts;
+    protected Map<String, String> taskRunningHosts;
     // hostname to rack mapping
-    protected Map<String, String> m_host2RackMapping;
+    protected Map<String, String> host2RackMapping;
 
-    protected String m_jobId;
-    protected String m_jobName;
-    protected String m_jobType;
-    protected String m_jobDefId;
-    protected String m_user;
-    protected String m_queueName;
-    protected Long m_jobLauchTime;
-    protected JobHistoryContentFilter m_filter;
+    protected String jobId;
+    protected String jobName;
+    protected String jobType;
+    protected String jobDefId;
+    protected String user;
+    protected String queueName;
+    protected Long jobLaunchTime;
+    protected JobHistoryContentFilter filter;
 
     protected final List<HistoryJobEntityLifecycleListener> 
jobEntityLifecycleListeners = new ArrayList<>();
 
@@ -90,42 +90,42 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
     }
 
     /**
-     * baseTags stores the basic tag name values which might be used for 
persisting various entities
+     * baseTags stores the basic tag name values which might be used for 
persisting various entities.
      * baseTags includes: cluster, datacenter and jobName
      * baseTags are used for all job/task related entities
      *
      * @param baseTags
      */
     public JHFEventReaderBase(Map<String, String> baseTags, Configuration 
configuration, JobHistoryContentFilter filter) {
-        this.m_filter = filter;
+        this.filter = filter;
 
-        this.m_baseTags = baseTags;
-        m_jobSubmitEventEntity = new JobEventAPIEntity();
-        m_jobSubmitEventEntity.setTags(new HashMap<>(baseTags));
+        this.baseTags = baseTags;
+        jobSubmitEventEntity = new JobEventAPIEntity();
+        jobSubmitEventEntity.setTags(new HashMap<>(baseTags));
 
-        m_jobLaunchEventEntity = new JobEventAPIEntity();
-        m_jobLaunchEventEntity.setTags(new HashMap<>(baseTags));
+        jobLaunchEventEntity = new JobEventAPIEntity();
+        jobLaunchEventEntity.setTags(new HashMap<>(baseTags));
 
-        m_jobFinishEventEntity = new JobEventAPIEntity();
-        m_jobFinishEventEntity.setTags(new HashMap<>(baseTags));
+        jobFinishEventEntity = new JobEventAPIEntity();
+        jobFinishEventEntity.setTags(new HashMap<>(baseTags));
 
-        m_jobExecutionEntity = new JobExecutionAPIEntity();
-        m_jobExecutionEntity.setTags(new HashMap<>(baseTags));
+        jobExecutionEntity = new JobExecutionAPIEntity();
+        jobExecutionEntity.setTags(new HashMap<>(baseTags));
 
-        m_taskRunningHosts = new HashMap<>();
+        taskRunningHosts = new HashMap<>();
 
-        m_host2RackMapping = new HashMap<>();
+        host2RackMapping = new HashMap<>();
 
-        m_taskStartTime = new HashMap<>();
-        m_taskAttemptStartTime = new HashMap<>();
+        taskStartTime = new HashMap<>();
+        taskAttemptStartTime = new HashMap<>();
 
         this.configuration = configuration;
 
-        if (this.configuration != null && this.m_jobType == null) {
+        if (this.configuration != null && this.jobType == null) {
             this.setJobType(fetchJobType(this.configuration).toString());
         }
-        this.sumMapTaskDuration = 0l;
-        this.sumReduceTaskDuration = 0l;
+        this.sumMapTaskDuration = 0L;
+        this.sumReduceTaskDuration = 0L;
     }
 
     public void register(HistoryJobEntityLifecycleListener lifecycleListener) {
@@ -135,8 +135,8 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
     @Override
     public void close() throws IOException {
         // check if this job history file is complete
-        if (m_jobExecutionEntity.getEndTime() == 0L) {
-            throw new IOException(new JHFWriteNotCompletedException(m_jobId));
+        if (jobExecutionEntity.getEndTime() == 0L) {
+            throw new IOException(new JHFWriteNotCompletedException(jobId));
         }
         try {
             flush();
@@ -154,134 +154,135 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
     }
 
     /**
+     * ...
      * @param id
      */
     private void setJobID(String id) {
-        this.m_jobId = id;
+        this.jobId = id;
     }
 
     private void setJobType(String jobType) {
-        this.m_jobType = jobType;
+        this.jobType = jobType;
     }
 
     protected void handleJob(EventType eventType, Map<Keys, String> values, 
Object totalCounters) throws Exception {
         String id = values.get(Keys.JOBID);
 
-        if (m_jobId == null) {
+        if (jobId == null) {
             setJobID(id);
-        } else if (!m_jobId.equals(id)) {
-            String msg = "Current job ID '" + id + "' does not match 
previously stored value '" + m_jobId + "'";
+        } else if (!jobId.equals(id)) {
+            String msg = "Current job ID '" + id + "' does not match 
previously stored value '" + jobId + "'";
             LOG.error(msg);
             throw new ImportException(msg);
         }
 
         if (values.get(Keys.SUBMIT_TIME) != null) {  // job submitted
-            
m_jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME)));
-            m_user = values.get(Keys.USER);
-            m_queueName = values.get(Keys.JOB_QUEUE);
-            m_jobName = values.get(Keys.JOBNAME);
+            
jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME)));
+            user = values.get(Keys.USER);
+            queueName = values.get(Keys.JOB_QUEUE);
+            jobName = values.get(Keys.JOBNAME);
 
             // If given job name then use it as norm job name, otherwise use 
eagle JobNameNormalization rule to generate.
             String jobDefId = null;
             if (configuration != null) {
-                jobDefId = configuration.get(m_filter.getJobNameKey());
+                jobDefId = configuration.get(filter.getJobNameKey());
             }
 
             if (jobDefId == null) {
-                m_jobDefId = 
JobNameNormalization.getInstance().normalize(m_jobName);
+                this.jobDefId = 
JobNameNormalization.getInstance().normalize(jobName);
             } else {
                 LOG.debug("Got JobDefId from job configuration for " + id + ": 
" + jobDefId);
-                m_jobDefId = jobDefId;
+                this.jobDefId = jobDefId;
             }
 
-            LOG.info("JobDefId of " + id + ": " + m_jobDefId);
+            LOG.info("JobDefId of " + id + ": " + this.jobDefId);
 
-            m_jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), 
m_user);
-            
m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
-            
m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), 
EagleJobStatus.SUBMITTED.name());
-            
m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), 
m_jobName);
-            
m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), 
m_jobDefId);
-            
m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), 
this.m_jobType);
-            entityCreated(m_jobSubmitEventEntity);
+            jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), 
user);
+            jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), 
jobId);
+            
jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), 
EagleJobStatus.SUBMITTED.name());
+            
jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), jobName);
+            
jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), 
this.jobDefId);
+            jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), 
this.jobType);
+            entityCreated(jobSubmitEventEntity);
         } else if (values.get(Keys.LAUNCH_TIME) != null) {  // job launched
-            
m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME)));
-            m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp();
-            m_jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), 
m_user);
-            
m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
-            
m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), 
EagleJobStatus.LAUNCHED.name());
-            
m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), 
m_jobName);
-            
m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), 
m_jobDefId);
-            
m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), 
this.m_jobType);
-            m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS));
-            m_numTotalReduces = 
Integer.valueOf(values.get(Keys.TOTAL_REDUCES));
-            entityCreated(m_jobLaunchEventEntity);
+            
jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME)));
+            jobLaunchTime = jobLaunchEventEntity.getTimestamp();
+            jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), 
user);
+            jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), 
jobId);
+            
jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), 
EagleJobStatus.LAUNCHED.name());
+            
jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), jobName);
+            
jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), 
jobDefId);
+            
jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), 
this.jobType);
+            numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS));
+            numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES));
+            entityCreated(jobLaunchEventEntity);
         } else if (values.get(Keys.FINISH_TIME) != null) {  // job finished
-            
m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME)));
-            m_jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), 
m_user);
-            
m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
-            
m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), 
values.get(Keys.JOB_STATUS));
-            
m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), 
m_jobName);
-            
m_jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), 
m_jobDefId);
-            
m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), 
this.m_jobType);
-            entityCreated(m_jobFinishEventEntity);
+            
jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME)));
+            jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), 
user);
+            jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), 
jobId);
+            
jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), 
values.get(Keys.JOB_STATUS));
+            
jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), jobName);
+            
jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), 
jobDefId);
+            
jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), 
this.jobType);
+            entityCreated(jobFinishEventEntity);
 
             // populate jobExecutionEntity entity
-            m_jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), 
m_user);
-            m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), 
m_jobId);
-            
m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
-            
m_jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), 
m_jobDefId);
-            
m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), 
m_queueName);
-            
m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), 
this.m_jobType);
-
-            m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS));
-            
m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp());
-            
m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp());
-            
m_jobExecutionEntity.setDurationTime(m_jobExecutionEntity.getEndTime() - 
m_jobExecutionEntity.getStartTime());
-            
m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
-            
m_jobExecutionEntity.setSubmissionTime(m_jobSubmitEventEntity.getTimestamp());
+            jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), 
user);
+            jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), 
jobId);
+            jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), 
jobName);
+            
jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), jobDefId);
+            
jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), queueName);
+            jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), 
this.jobType);
+
+            jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS));
+            
jobExecutionEntity.setStartTime(jobLaunchEventEntity.getTimestamp());
+            jobExecutionEntity.setEndTime(jobFinishEventEntity.getTimestamp());
+            jobExecutionEntity.setDurationTime(jobExecutionEntity.getEndTime() 
- jobExecutionEntity.getStartTime());
+            
jobExecutionEntity.setTimestamp(jobLaunchEventEntity.getTimestamp());
+            
jobExecutionEntity.setSubmissionTime(jobSubmitEventEntity.getTimestamp());
             if (values.get(Keys.FAILED_MAPS) != null) {
                 // for Artemis
-                
m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS)));
+                
jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS)));
             }
             if (values.get(Keys.FAILED_REDUCES) != null) {
                 // for Artemis
-                
m_jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES)));
+                
jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES)));
             }
-            
m_jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS)));
-            
m_jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES)));
-            m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps);
-            m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces);
+            
jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS)));
+            
jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES)));
+            jobExecutionEntity.setNumTotalMaps(numTotalMaps);
+            jobExecutionEntity.setNumTotalReduces(numTotalReduces);
             if (values.get(Keys.COUNTERS) != null || totalCounters != null) {
                 JobCounters jobCounters = parseCounters(totalCounters);
-                m_jobExecutionEntity.setJobCounters(jobCounters);
+                jobExecutionEntity.setJobCounters(jobCounters);
                 if 
(jobCounters.getCounters().containsKey(Constants.JOB_COUNTER)) {
                     Map<String, Long> counters = 
jobCounters.getCounters().get(Constants.JOB_COUNTER);
                     if 
(counters.containsKey(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) {
-                        
m_jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue());
+                        
jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue());
                     }
 
                     if 
(counters.containsKey(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) {
-                        
m_jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue());
+                        
jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue());
                     }
 
                     if 
(counters.containsKey(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) {
-                        
m_jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue());
+                        
jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue());
                     }
                 }
 
-                if (m_jobExecutionEntity.getTotalLaunchedMaps() > 0) {
-                    
m_jobExecutionEntity.setDataLocalMapsPercentage(m_jobExecutionEntity.getDataLocalMaps()
 * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
-                    
m_jobExecutionEntity.setRackLocalMapsPercentage(m_jobExecutionEntity.getRackLocalMaps()
 * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
+                if (jobExecutionEntity.getTotalLaunchedMaps() > 0) {
+                    
jobExecutionEntity.setDataLocalMapsPercentage(jobExecutionEntity.getDataLocalMaps()
 * 1.0 / jobExecutionEntity.getTotalLaunchedMaps());
+                    
jobExecutionEntity.setRackLocalMapsPercentage(jobExecutionEntity.getRackLocalMaps()
 * 1.0 / jobExecutionEntity.getTotalLaunchedMaps());
                 }
             }
-            m_jobExecutionEntity.setAvgMapTaskDuration(this.sumMapTaskDuration 
* 1.0 / m_numTotalMaps);
-            if (m_numTotalReduces == 0) {
-                m_jobExecutionEntity.setMaxReduceTaskDuration(0);
-                m_jobExecutionEntity.setAvgReduceTaskDuration(0);
+            jobExecutionEntity.setAvgMapTaskDuration(this.sumMapTaskDuration * 
1.0 / numTotalMaps);
+            if (numTotalReduces == 0) {
+                jobExecutionEntity.setMaxReduceTaskDuration(0);
+                jobExecutionEntity.setAvgReduceTaskDuration(0);
             } else {
-                
m_jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 
/ m_numTotalReduces);
+                
jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 / 
numTotalReduces);
             }
-            entityCreated(m_jobExecutionEntity);
+            entityCreated(jobExecutionEntity);
         }
     }
 
@@ -290,8 +291,8 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
             lifecycleListener.jobEntityCreated(entity);
         }
 
-        // job finished when passing JobExecutionAPIEntity: 
m_jobExecutionEntity
-        if (entity == this.m_jobExecutionEntity) {
+        // job finished when passing JobExecutionAPIEntity: jobExecutionEntity
+        if (entity == this.jobExecutionEntity) {
             for (HistoryJobEntityLifecycleListener lifecycleListener : 
this.jobEntityLifecycleListeners) {
                 lifecycleListener.jobFinish();
             }
@@ -317,49 +318,51 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
         final String taskType = values.get(Keys.TASK_TYPE);
         final String taskID = values.get(Keys.TASKID);
 
-        Map<String, String> taskBaseTags = new HashMap<String, String>() {{
-            put(MRJobTagName.TASK_TYPE.toString(), taskType);
-            put(MRJobTagName.USER.toString(), m_user);
-            //put(MRJobTagName.JOB_NAME.toString(), _jobName);
-            put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
-            put(MRJobTagName.JOB_TYPE.toString(), m_jobType);
-            put(MRJobTagName.JOB_ID.toString(), m_jobId);
-            put(MRJobTagName.TASK_ID.toString(), taskID);
-        }};
-        taskBaseTags.putAll(m_baseTags);
+        Map<String, String> taskBaseTags = new HashMap<String, String>() {
+            {
+                put(MRJobTagName.TASK_TYPE.toString(), taskType);
+                put(MRJobTagName.USER.toString(), user);
+                //put(MRJobTagName.JOB_NAME.toString(), _jobName);
+                put(MRJobTagName.JOD_DEF_ID.toString(), jobDefId);
+                put(MRJobTagName.JOB_TYPE.toString(), jobType);
+                put(MRJobTagName.JOB_ID.toString(), jobId);
+                put(MRJobTagName.TASK_ID.toString(), taskID);
+            }
+        };
+        taskBaseTags.putAll(baseTags);
         if (recType == RecordTypes.Task && startTime != null) { // task start, 
no host is assigned yet
-            m_taskStartTime.put(taskID, Long.valueOf(startTime));
+            taskStartTime.put(taskID, Long.valueOf(startTime));
         } else if (recType == RecordTypes.Task && finishTime != null) { // 
task finish
             // task execution entity setup
-            TaskExecutionAPIEntity entity = new TaskExecutionAPIEntity();
             Map<String, String> taskExecutionTags = new 
HashMap<>(taskBaseTags);
-            String hostname = m_taskRunningHosts.get(taskID);
+            String hostname = taskRunningHosts.get(taskID);
             hostname = (hostname == null) ? "" : hostname; // TODO if task 
fails, then no hostname
             taskExecutionTags.put(MRJobTagName.HOSTNAME.toString(), hostname);
-            taskExecutionTags.put(MRJobTagName.RACK.toString(), 
m_host2RackMapping.get(hostname));
+            taskExecutionTags.put(MRJobTagName.RACK.toString(), 
host2RackMapping.get(hostname));
+            TaskExecutionAPIEntity entity = new TaskExecutionAPIEntity();
             entity.setTags(taskExecutionTags);
-            entity.setStartTime(m_taskStartTime.get(taskID));
+            entity.setStartTime(taskStartTime.get(taskID));
             entity.setEndTime(Long.valueOf(finishTime));
             entity.setDuration(entity.getEndTime() - entity.getStartTime());
-            entity.setTimestamp(m_jobLauchTime);
+            entity.setTimestamp(jobLaunchTime);
             entity.setError(values.get(Keys.ERROR));
             entity.setTaskStatus(values.get(Keys.TASK_STATUS));
             if (values.get(Keys.COUNTERS) != null || counters != null) {
                 entity.setJobCounters(parseCounters(counters));
             }
-            long duration = entity.getEndTime() - 
m_jobSubmitEventEntity.getTimestamp();
-            if (taskType.equals(Constants.TaskType.MAP.toString()) && duration 
> m_jobExecutionEntity.getLastMapDuration()) {
-                m_jobExecutionEntity.setLastMapDuration(duration);
+            long duration = entity.getEndTime() - 
jobSubmitEventEntity.getTimestamp();
+            if (taskType.equals(Constants.TaskType.MAP.toString()) && duration 
> jobExecutionEntity.getLastMapDuration()) {
+                jobExecutionEntity.setLastMapDuration(duration);
             }
-            if (taskType.equals(Constants.TaskType.REDUCE.toString()) && 
duration > m_jobExecutionEntity.getLastReduceDuration()) {
-                m_jobExecutionEntity.setLastReduceDuration(duration);
+            if (taskType.equals(Constants.TaskType.REDUCE.toString()) && 
duration > jobExecutionEntity.getLastReduceDuration()) {
+                jobExecutionEntity.setLastReduceDuration(duration);
             }
 
-            if (taskType.equals(Constants.TaskType.MAP.toString()) && 
entity.getDuration() > m_jobExecutionEntity.getMaxMapTaskDuration()) {
-                
m_jobExecutionEntity.setMaxMapTaskDuration(entity.getDuration());
+            if (taskType.equals(Constants.TaskType.MAP.toString()) && 
entity.getDuration() > jobExecutionEntity.getMaxMapTaskDuration()) {
+                jobExecutionEntity.setMaxMapTaskDuration(entity.getDuration());
             }
-            if (taskType.equals(Constants.TaskType.REDUCE.toString()) && 
entity.getDuration() > m_jobExecutionEntity.getMaxReduceTaskDuration()) {
-                
m_jobExecutionEntity.setMaxReduceTaskDuration(entity.getDuration());
+            if (taskType.equals(Constants.TaskType.REDUCE.toString()) && 
entity.getDuration() > jobExecutionEntity.getMaxReduceTaskDuration()) {
+                
jobExecutionEntity.setMaxReduceTaskDuration(entity.getDuration());
             }
 
             if (taskType.equals(Constants.TaskType.MAP.toString())) {
@@ -371,7 +374,7 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
             entityCreated(entity);
             //_taskStartTime.remove(taskID); // clean this taskID
         } else if ((recType == RecordTypes.MapAttempt || recType == 
RecordTypes.ReduceAttempt) && startTime != null) { // task attempt start
-            m_taskAttemptStartTime.put(taskAttemptID, Long.valueOf(startTime));
+            taskAttemptStartTime.put(taskAttemptID, Long.valueOf(startTime));
         } else if ((recType == RecordTypes.MapAttempt || recType == 
RecordTypes.ReduceAttempt) && finishTime != null) {   // task attempt finish
             TaskAttemptExecutionAPIEntity entity = new 
TaskAttemptExecutionAPIEntity();
             Map<String, String> taskAttemptExecutionTags = new 
HashMap<>(taskBaseTags);
@@ -381,16 +384,16 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
             taskAttemptExecutionTags.put(MRJobTagName.HOSTNAME.toString(), 
hostname);
             taskAttemptExecutionTags.put(MRJobTagName.RACK.toString(), rack);
             // put last attempt's hostname to task level
-            m_taskRunningHosts.put(taskID, hostname);
+            taskRunningHosts.put(taskID, hostname);
             // it is very likely that an attempt ID could be both succeeded 
and failed due to M/R system
             // in this case, we should ignore this attempt?
-            if (m_taskAttemptStartTime.get(taskAttemptID) == null) {
+            if (taskAttemptStartTime.get(taskAttemptID) == null) {
                 LOG.warn("task attemp has consistency issue " + taskAttemptID);
                 return;
             }
-            entity.setStartTime(m_taskAttemptStartTime.get(taskAttemptID));
+            entity.setStartTime(taskAttemptStartTime.get(taskAttemptID));
             entity.setEndTime(Long.valueOf(finishTime));
-            entity.setTimestamp(m_jobLauchTime);
+            entity.setTimestamp(jobLaunchTime);
             entity.setDuration(entity.getEndTime() - entity.getStartTime());
             entity.setTaskStatus(values.get(Keys.TASK_STATUS));
             entity.setError(values.get(Keys.ERROR));
@@ -400,7 +403,7 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
             }
             entity.setTaskAttemptID(taskAttemptID);
             entityCreated(entity);
-            m_taskAttemptStartTime.remove(taskAttemptID);
+            taskAttemptStartTime.remove(taskAttemptID);
         } else {
             // silently ignore
             LOG.warn("It's an exceptional case ?");
@@ -410,7 +413,7 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
     public void parseConfiguration() throws Exception {
         Map<String, String> prop = new TreeMap<>();
 
-        if (m_filter.acceptJobConfFile()) {
+        if (filter.acceptJobConfFile()) {
             Iterator<Map.Entry<String, String>> iter = 
configuration.iterator();
             while (iter.hasNext()) {
                 String key = iter.next().getKey();
@@ -423,28 +426,28 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
         // check must-have keys are within prop
         if (matchMustHaveKeyPatterns(prop)) {
             JobConfigurationAPIEntity jobConfigurationEntity = new 
JobConfigurationAPIEntity();
-            jobConfigurationEntity.setTags(new HashMap<>(m_baseTags));
-            jobConfigurationEntity.getTags().put(MRJobTagName.USER.toString(), 
m_user);
-            
jobConfigurationEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
-            
jobConfigurationEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), 
m_jobName);
-            
jobConfigurationEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), 
m_jobDefId);
-            
jobConfigurationEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), 
m_jobType);
-            
jobConfigurationEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
+            jobConfigurationEntity.setTags(new HashMap<>(baseTags));
+            jobConfigurationEntity.getTags().put(MRJobTagName.USER.toString(), 
user);
+            
jobConfigurationEntity.getTags().put(MRJobTagName.JOB_ID.toString(), jobId);
+            
jobConfigurationEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), jobName);
+            
jobConfigurationEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), 
jobDefId);
+            
jobConfigurationEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), jobType);
+            
jobConfigurationEntity.setTimestamp(jobLaunchEventEntity.getTimestamp());
 
             JobConfig jobConfig = new JobConfig();
             jobConfig.setConfig(prop);
             jobConfigurationEntity.setJobConfig(jobConfig);
-            jobConfigurationEntity.setConfigJobName(m_jobDefId);
+            jobConfigurationEntity.setConfigJobName(jobDefId);
             entityCreated(jobConfigurationEntity);
         }
     }
 
     private boolean matchMustHaveKeyPatterns(Map<String, String> prop) {
-        if (m_filter.getMustHaveJobConfKeyPatterns() == null) {
+        if (filter.getMustHaveJobConfKeyPatterns() == null) {
             return true;
         }
 
-        for (Pattern p : m_filter.getMustHaveJobConfKeyPatterns()) {
+        for (Pattern p : filter.getMustHaveJobConfKeyPatterns()) {
             boolean matched = false;
             for (String key : prop.keySet()) {
                 if (p.matcher(key).matches()) {
@@ -460,10 +463,10 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
     }
 
     private boolean included(String key) {
-        if (m_filter.getJobConfKeyInclusionPatterns() == null) {
+        if (filter.getJobConfKeyInclusionPatterns() == null) {
             return true;
         }
-        for (Pattern p : m_filter.getJobConfKeyInclusionPatterns()) {
+        for (Pattern p : filter.getJobConfKeyInclusionPatterns()) {
             Matcher m = p.matcher(key);
             if (m.matches()) {
                 LOG.info("include key: " + p.toString());
@@ -474,10 +477,10 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
     }
 
     private boolean excluded(String key) {
-        if (m_filter.getJobConfKeyExclusionPatterns() == null) {
+        if (filter.getJobConfKeyExclusionPatterns() == null) {
             return false;
         }
-        for (Pattern p : m_filter.getJobConfKeyExclusionPatterns()) {
+        for (Pattern p : filter.getJobConfKeyExclusionPatterns()) {
             Matcher m = p.matcher(key);
             if (m.matches()) {
                 return true;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
index 6932dad..e20836f 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
@@ -37,14 +37,12 @@ import java.util.Map;
 
 /**
  * Listener holds all informations related to one whole job history file, so 
it's stateful and does not support multithreading.
- *
- * @author yonzhang
  */
 public class JHFMRVer1EventReader extends JHFEventReaderBase implements 
JHFMRVer1PerLineListener {
     private static final Logger logger = 
LoggerFactory.getLogger(JHFMRVer1EventReader.class);
 
     /**
-     * baseTags stores the basic tag name values which might be used for 
persisting various entities
+     * baseTags stores the basic tag name values which might be used for 
persisting various entities.
      * baseTags includes: cluster, datacenter and jobName
      * baseTags are used for all job/task related entities
      *
@@ -93,12 +91,12 @@ public class JHFMRVer1EventReader extends 
JHFEventReaderBase implements JHFMRVer
             String[] tmp = decoratedHostname.split("/");
             hostname = tmp[tmp.length - 1];
             rack = tmp[tmp.length - 2];
-            m_host2RackMapping.put(hostname, rack);
+            host2RackMapping.put(hostname, rack);
         } else if 
(values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.KILLED.name()) || 
values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.FAILED.name())) {
             hostname = values.get(Keys.HOSTNAME);
             // make every effort to get RACK information
             hostname = (hostname == null) ? "" : hostname;
-            rack = m_host2RackMapping.get(hostname);
+            rack = host2RackMapping.get(hostname);
         }
 
         values.put(Keys.HOSTNAME, hostname);
@@ -149,6 +147,6 @@ public class JHFMRVer1EventReader extends 
JHFEventReaderBase implements JHFMRVer
     }
 
     public JobExecutionAPIEntity jobExecution() {
-        return m_jobExecutionEntity;
+        return jobExecutionEntity;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
index ab59a41..2f63703 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
@@ -43,10 +43,10 @@ public class JHFMRVer1Parser implements JHFParserBase {
     static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + 
"\"");
     static final String MAX_COUNTER_COUNT = "10000";
 
-    private JHFMRVer1EventReader m_reader;
+    private JHFMRVer1EventReader reader;
 
     public JHFMRVer1Parser(JHFMRVer1EventReader reader) {
-        this.m_reader = reader;
+        this.reader = reader;
     }
 
     /**
@@ -85,13 +85,14 @@ public class JHFMRVer1Parser implements JHFParserBase {
                     buf.append("\n");
                     continue;
                 }
-                parseLine(buf.toString(), m_reader, isEscaped);
+                parseLine(buf.toString(), this.reader, isEscaped);
                 buf = new StringBuffer();
-            } while ((line = reader.readLine()) != null);
+            }
+            while ((line = reader.readLine()) != null);
 
             // flush to tell listener that we have finished parsing
             logger.info("finish parsing job history file and close");
-            m_reader.close();
+            this.reader.close();
         } catch (Exception ex) {
             logger.error("can not parse correctly ", ex);
             throw ex;
@@ -121,10 +122,6 @@ public class JHFMRVer1Parser implements JHFParserBase {
             parseBuffer.put(Keys.valueOf(parts[0]), value);
         }
 
-//        if(conf!=null){
-//            parseBuffer.put(Keys.NORM_JOBNAME, 
conf.get(JPAConstants.JOB_CONF_NORM_JOBNAME_KEY));
-//        }
-
         try {
             l.handle(RecordTypes.valueOf(recType), parseBuffer);
         } catch (IllegalArgumentException ex) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 4ca0449..f21fd41 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -33,14 +33,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class JHFMRVer2EventReader extends JHFEventReaderBase {
     private static final Logger logger = 
LoggerFactory.getLogger(JHFMRVer2EventReader.class);
 
     /**
-     * Create a new Event Reader
+     * Create a new Event Reader.
      *
      * @throws IOException
      */
@@ -455,7 +454,7 @@ public class JHFMRVer2EventReader extends 
JHFEventReaderBase {
         String[] tmp = rackname.split("/");
         String rack = tmp[tmp.length - 1];
         values.put(Keys.RACK, rack);
-        m_host2RackMapping.put(values.get(Keys.HOSTNAME), rack);
+        host2RackMapping.put(values.get(Keys.HOSTNAME), rack);
     }
 
     private void handleMapAttemptFailed(Event wrapper) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
index 2ccbf8d..f93f942 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
@@ -33,10 +33,10 @@ import java.io.InputStream;
 
 public class JHFMRVer2Parser implements JHFParserBase {
     private static final Logger logger = 
LoggerFactory.getLogger(JHFMRVer2Parser.class);
-    private JHFMRVer2EventReader _reader;
+    private JHFMRVer2EventReader reader;
 
     public JHFMRVer2Parser(JHFMRVer2EventReader reader) {
-        this._reader = reader;
+        this.reader = reader;
     }
 
     @SuppressWarnings( {"rawtypes", "deprecation"})
@@ -58,11 +58,11 @@ public class JHFMRVer2Parser implements JHFParserBase {
             Event wrapper;
             while ((wrapper = getNextEvent(datumReader, decoder)) != null) {
                 ++eventCtr;
-                _reader.handleEvent(wrapper);
+                reader.handleEvent(wrapper);
             }
-            _reader.parseConfiguration();
+            reader.parseConfiguration();
             // don't need put to finally as it's a kind of flushing data
-            _reader.close();
+            reader.close();
             logger.info("reader used " + (System.currentTimeMillis() - start) 
+ "ms");
         } catch (Exception ioe) {
             logger.error("Caught exception parsing history file after " + 
eventCtr + " events", ioe);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
index a2479ab..36a0ef7 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
@@ -20,12 +20,9 @@ package org.apache.eagle.jpm.mr.history.parser;
 
 import java.io.InputStream;
 
-/**
- * @author yonzhang
- */
 public interface JHFParserBase {
     /**
-     * this method will ensure to close the inputstream
+     * this method will ensure to close the inputStream.
      *
      * @param is
      * @throws Exception

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
index d5ed97a..64b9fe0 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
@@ -19,15 +19,11 @@
 package org.apache.eagle.jpm.mr.history.parser;
 
 /**
- * used to warn that one job history file has not yet completed writing to hdfs
+ * used to warn that one job history file has not yet completed writing to 
hdfs.
  * This happens when feeder catches up and the history file has not been 
written into hdfs completely
  *
- * @author yonzhang
  */
 public class JHFWriteNotCompletedException extends Exception {
-    /**
-     *
-     */
     private static final long serialVersionUID = -3060175780718218490L;
 
     public JHFWriteNotCompletedException(String msg) {

Reply via email to