[EAGLE-515] refactor zk manager to singleton

Author: wujinhu <wujinhu...@126.com>

Closes #411 from wujinhu/EAGLE-515.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c940f56c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c940f56c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c940f56c

Branch: refs/heads/master
Commit: c940f56c27296c5fec657bd240e96c5522199262
Parents: 8de69a8
Author: wujinhu <wujinhu...@126.com>
Authored: Thu Sep 1 11:26:10 2016 +0800
Committer: Hao Chen <h...@apache.org>
Committed: Thu Sep 1 11:26:10 2016 +0800

----------------------------------------------------------------------
 .../mr/history/crawler/JHFCrawlerDriverImpl.java   | 17 +++++++----------
 .../JobEntityCreationEagleServiceListener.java     |  4 +---
 .../jpm/mr/history/storm/JobHistorySpout.java      | 10 ++++------
 .../mr/history/zkres/JobHistoryZKStateManager.java |  8 +++++++-
 4 files changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index 1a17751..077f4e1 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -20,7 +20,7 @@ package org.apache.eagle.jpm.mr.history.crawler;
 
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus;
-import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateLCM;
+import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.mr.historyentity.JobCountEntity;
 import org.apache.eagle.jpm.util.JobIdFilter;
 import org.apache.commons.lang3.tuple.Pair;
@@ -58,7 +58,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver 
{
     private JHFInputStreamCallback reader;
     protected boolean zeroBasedMonth = true;
 
-    private JobHistoryZKStateLCM zkStateLcm;
     private JobHistoryLCM jhfLCM;
     private JobIdFilter jobFilter;
     private int partitionId;
@@ -69,7 +68,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver 
{
     public JHFCrawlerDriverImpl(MRHistoryJobConfig.EagleServiceConfig 
eagleServiceConfig,
                                 MRHistoryJobConfig.JobExtractorConfig 
jobExtractorConfig,
                                 MRHistoryJobConfig.ControlConfig 
controlConfig, JHFInputStreamCallback reader,
-                                JobHistoryZKStateLCM zkStateLCM,
                                 JobHistoryLCM historyLCM, JobIdFilter 
jobFilter, int partitionId) throws Exception {
         this.eagleServiceConfig = eagleServiceConfig;
         this.jobExtractorConfig = jobExtractorConfig;
@@ -80,7 +78,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver 
{
         }
         this.reader = reader;
         jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig);
-        this.zkStateLcm = zkStateLCM;
         this.partitionId = partitionId;
         this.jobFilter = jobFilter;
         timeZone = TimeZone.getTimeZone(controlConfig.timeZone);
@@ -187,7 +184,7 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
                     jobHistoryFile,
                 reader);
         }
-        zkStateLcm.addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE,
+        
JobHistoryZKStateManager.instance().addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE,
                 this.processDate.year,
                 this.processDate.month + 1,
                 this.processDate.day),
@@ -202,7 +199,7 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
     private void updateProcessDate() throws Exception {
         String line = String.format(FORMAT_JOB_PROCESS_DATE, 
this.processDate.year,
                 this.processDate.month + 1, this.processDate.day);
-        zkStateLcm.updateProcessedDate(partitionId, line);
+        JobHistoryZKStateManager.instance().updateProcessedDate(partitionId, 
line);
     }
 
     private int getActualMonth(int month) {
@@ -220,7 +217,7 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
     }
 
     private void readAndCacheLastProcessedDate() throws Exception {
-        String lastProcessedDate = zkStateLcm.readProcessedDate(partitionId);
+        String lastProcessedDate = 
JobHistoryZKStateManager.instance().readProcessedDate(partitionId);
         Matcher m = PATTERN_JOB_PROCESS_DATE.matcher(lastProcessedDate);
         if (m.find() && m.groupCount() == 3) {
             this.processDate.year = Integer.parseInt(m.group(1));
@@ -233,7 +230,7 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
         GregorianCalendar cal = new GregorianCalendar(timeZone);
         cal.set(this.processDate.year, this.processDate.month, 
this.processDate.day, 0, 0, 0);
         cal.add(Calendar.DATE, 1);
-        List<String> list = 
zkStateLcm.readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE, 
cal.get(Calendar.YEAR),
+        List<String> list = 
JobHistoryZKStateManager.instance().readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE,
 cal.get(Calendar.YEAR),
                 cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH)));
         if (list != null) {
             this.processedJobFileNames = new HashSet<>(list);
@@ -241,7 +238,7 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
     }
 
     private void flushJobCount() throws Exception {
-        List<Pair<String, String>> jobs = zkStateLcm.getProcessedJobs(
+        List<Pair<String, String>> jobs = 
JobHistoryZKStateManager.instance().getProcessedJobs(
             String.format(FORMAT_JOB_PROCESS_DATE, this.processDate.year, 
this.processDate.month + 1, this.processDate.day)
         );
         JobCountEntity entity = new JobCountEntity();
@@ -300,7 +297,7 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
         cal.add(Calendar.DATE, -1 - PROCESSED_JOB_KEEP_DAYS);
         String line = String.format(FORMAT_JOB_PROCESS_DATE, 
cal.get(Calendar.YEAR),
                 cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH));
-        zkStateLcm.truncateProcessedJob(line);
+        JobHistoryZKStateManager.instance().truncateProcessedJob(line);
     }
 
     private boolean isToday() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 520fbbc..30eeb54 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -90,13 +90,12 @@ public class JobEntityCreationEagleServiceListener 
implements HistoryJobEntityCr
             eagleServiceConfig.password);
 
         
client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 
1000);
-        JobHistoryZKStateManager zkState = new 
JobHistoryZKStateManager(configManager.getZkStateConfig());
         logger.info("start flushing entities of total number " + list.size());
         for (int i = 0; i < list.size(); i++) {
             JobBaseAPIEntity entity = list.get(i);
             if (entity instanceof JobExecutionAPIEntity) {
                 jobs.add((JobExecutionAPIEntity) entity);
-                
zkState.updateProcessedJob(timeStamp2Date(entity.getTimestamp()),
+                
JobHistoryZKStateManager.instance().updateProcessedJob(timeStamp2Date(entity.getTimestamp()),
                     entity.getTags().get(MRJobTagName.JOB_ID.toString()),
                     ((JobExecutionAPIEntity) entity).getCurrentState());
             } else if (entity instanceof JobEventAPIEntity) {
@@ -107,7 +106,6 @@ public class JobEntityCreationEagleServiceListener 
implements HistoryJobEntityCr
                 taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity);
             }
         }
-        zkState.close();
         GenericServiceAPIResponseEntity result;
         if (jobs.size() > 0) {
             logger.info("flush JobExecutionAPIEntity of number " + 
jobs.size());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 402f93e..04283d3 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -87,7 +87,6 @@ public class JobHistorySpout extends BaseRichSpout {
 
     private int partitionId;
     private int numTotalPartitions;
-    private transient JobHistoryZKStateManager zkState;
     private transient JHFCrawlerDriver driver;
     private JobHistoryContentFilter contentFilter;
     private JobHistorySpoutCollectorInterceptor interceptor;
@@ -143,8 +142,8 @@ public class JobHistorySpout extends BaseRichSpout {
             throw new IllegalStateException(e);
         }
         JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, 
numTotalPartitions, partitionId);
-        zkState = new 
JobHistoryZKStateManager(configManager.getZkStateConfig());
-        zkState.ensureJobPartitions(numTotalPartitions);
+        
JobHistoryZKStateManager.instance().init(configManager.getZkStateConfig());
+        
JobHistoryZKStateManager.instance().ensureJobPartitions(numTotalPartitions);
         interceptor.setSpoutOutputCollector(collector);
 
         try {
@@ -154,7 +153,6 @@ public class JobHistorySpout extends BaseRichSpout {
                 configManager.getJobExtractorConfig(),
                 configManager.getControlConfig(),
                 callback,
-                zkState,
                 jhfLCM,
                 jobIdFilter,
                 partitionId);
@@ -168,7 +166,7 @@ public class JobHistorySpout extends BaseRichSpout {
     public void nextTuple() {
         try {
             Long modifiedTime = driver.crawl();
-            zkState.updateProcessedTimeStamp(partitionId, modifiedTime);
+            
JobHistoryZKStateManager.instance().updateProcessedTimeStamp(partitionId, 
modifiedTime);
             updateProcessedTimeStamp(modifiedTime);
         } catch (Exception ex) {
             LOG.error("fail crawling job history file and continue ...", ex);
@@ -223,7 +221,7 @@ public class JobHistorySpout extends BaseRichSpout {
         //update latest process time
         long minTimeStamp = modifiedTime;
         for (int i = 1; i < numTotalPartitions; i++) {
-            long time = zkState.readProcessedTimeStamp(i);
+            long time = 
JobHistoryZKStateManager.instance().readProcessedTimeStamp(i);
             if (time <= minTimeStamp) {
                 minTimeStamp = time;
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
index c61d05a..2e64da3 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
@@ -47,6 +47,8 @@ public class JobHistoryZKStateManager implements 
JobHistoryZKStateLCM {
 
     public static final int BACKOFF_DAYS = 0;
 
+    private static JobHistoryZKStateManager jobHistoryZKStateManager = new 
JobHistoryZKStateManager();
+
     private CuratorFramework newCurator(ZKStateConfig config) throws Exception 
{
         return CuratorFrameworkFactory.newClient(
             config.zkQuorum,
@@ -56,7 +58,11 @@ public class JobHistoryZKStateManager implements 
JobHistoryZKStateLCM {
         );
     }
 
-    public JobHistoryZKStateManager(ZKStateConfig config) {
+    public static JobHistoryZKStateManager instance() {
+        return jobHistoryZKStateManager;
+    }
+
+    public void init(ZKStateConfig config) {
         this.zkRoot = config.zkRoot;
 
         try {

Reply via email to