[EAGLE-515] refactor zk manager to singleton Author: wujinhu <wujinhu...@126.com>
Closes #411 from wujinhu/EAGLE-515. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c940f56c Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c940f56c Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c940f56c Branch: refs/heads/master Commit: c940f56c27296c5fec657bd240e96c5522199262 Parents: 8de69a8 Author: wujinhu <wujinhu...@126.com> Authored: Thu Sep 1 11:26:10 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Thu Sep 1 11:26:10 2016 +0800 ---------------------------------------------------------------------- .../mr/history/crawler/JHFCrawlerDriverImpl.java | 17 +++++++---------- .../JobEntityCreationEagleServiceListener.java | 4 +--- .../jpm/mr/history/storm/JobHistorySpout.java | 10 ++++------ .../mr/history/zkres/JobHistoryZKStateManager.java | 8 +++++++- 4 files changed, 19 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java index 1a17751..077f4e1 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java @@ -20,7 +20,7 @@ package org.apache.eagle.jpm.mr.history.crawler; import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus; -import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateLCM; +import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager; import org.apache.eagle.jpm.mr.historyentity.JobCountEntity; import org.apache.eagle.jpm.util.JobIdFilter; import org.apache.commons.lang3.tuple.Pair; @@ -58,7 +58,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { private JHFInputStreamCallback reader; protected boolean zeroBasedMonth = true; - private JobHistoryZKStateLCM zkStateLcm; private JobHistoryLCM jhfLCM; private JobIdFilter jobFilter; private int partitionId; @@ -69,7 +68,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { public JHFCrawlerDriverImpl(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig, MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig, MRHistoryJobConfig.ControlConfig controlConfig, JHFInputStreamCallback reader, - JobHistoryZKStateLCM zkStateLCM, JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception { this.eagleServiceConfig = eagleServiceConfig; this.jobExtractorConfig = jobExtractorConfig; @@ -80,7 +78,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { } this.reader = reader; jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig); - this.zkStateLcm = zkStateLCM; this.partitionId = partitionId; this.jobFilter = jobFilter; timeZone = TimeZone.getTimeZone(controlConfig.timeZone); @@ -187,7 +184,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { jobHistoryFile, reader); } - zkStateLcm.addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE, + JobHistoryZKStateManager.instance().addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE, this.processDate.year, this.processDate.month + 1, this.processDate.day), @@ -202,7 +199,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { private void updateProcessDate() throws Exception { String line = String.format(FORMAT_JOB_PROCESS_DATE, this.processDate.year, this.processDate.month + 1, this.processDate.day); - zkStateLcm.updateProcessedDate(partitionId, line); + JobHistoryZKStateManager.instance().updateProcessedDate(partitionId, line); } private int getActualMonth(int month) { @@ -220,7 +217,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { } private void readAndCacheLastProcessedDate() throws Exception { - String lastProcessedDate = zkStateLcm.readProcessedDate(partitionId); + String lastProcessedDate = JobHistoryZKStateManager.instance().readProcessedDate(partitionId); Matcher m = PATTERN_JOB_PROCESS_DATE.matcher(lastProcessedDate); if (m.find() && m.groupCount() == 3) { this.processDate.year = Integer.parseInt(m.group(1)); @@ -233,7 +230,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { GregorianCalendar cal = new GregorianCalendar(timeZone); cal.set(this.processDate.year, this.processDate.month, this.processDate.day, 0, 0, 0); cal.add(Calendar.DATE, 1); - List<String> list = zkStateLcm.readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR), + List<String> list = JobHistoryZKStateManager.instance().readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH))); if (list != null) { this.processedJobFileNames = new HashSet<>(list); @@ -241,7 +238,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { } private void flushJobCount() throws Exception { - List<Pair<String, String>> jobs = zkStateLcm.getProcessedJobs( + List<Pair<String, String>> jobs = JobHistoryZKStateManager.instance().getProcessedJobs( String.format(FORMAT_JOB_PROCESS_DATE, this.processDate.year, this.processDate.month + 1, this.processDate.day) ); JobCountEntity entity = new JobCountEntity(); @@ -300,7 +297,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { cal.add(Calendar.DATE, -1 - PROCESSED_JOB_KEEP_DAYS); String line = String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH)); - zkStateLcm.truncateProcessedJob(line); + JobHistoryZKStateManager.instance().truncateProcessedJob(line); } private boolean isToday() { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java index 520fbbc..30eeb54 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java @@ -90,13 +90,12 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr eagleServiceConfig.password); client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); - JobHistoryZKStateManager zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig()); logger.info("start flushing entities of total number " + list.size()); for (int i = 0; i < list.size(); i++) { JobBaseAPIEntity entity = list.get(i); if (entity instanceof JobExecutionAPIEntity) { jobs.add((JobExecutionAPIEntity) entity); - zkState.updateProcessedJob(timeStamp2Date(entity.getTimestamp()), + JobHistoryZKStateManager.instance().updateProcessedJob(timeStamp2Date(entity.getTimestamp()), entity.getTags().get(MRJobTagName.JOB_ID.toString()), ((JobExecutionAPIEntity) entity).getCurrentState()); } else if (entity instanceof JobEventAPIEntity) { @@ -107,7 +106,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity); } } - zkState.close(); GenericServiceAPIResponseEntity result; if (jobs.size() > 0) { logger.info("flush JobExecutionAPIEntity of number " + jobs.size()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java index 402f93e..04283d3 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java @@ -87,7 +87,6 @@ public class JobHistorySpout extends BaseRichSpout { private int partitionId; private int numTotalPartitions; - private transient JobHistoryZKStateManager zkState; private transient JHFCrawlerDriver driver; private JobHistoryContentFilter contentFilter; private JobHistorySpoutCollectorInterceptor interceptor; @@ -143,8 +142,8 @@ public class JobHistorySpout extends BaseRichSpout { throw new IllegalStateException(e); } JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions, partitionId); - zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig()); - zkState.ensureJobPartitions(numTotalPartitions); + JobHistoryZKStateManager.instance().init(configManager.getZkStateConfig()); + JobHistoryZKStateManager.instance().ensureJobPartitions(numTotalPartitions); interceptor.setSpoutOutputCollector(collector); try { @@ -154,7 +153,6 @@ public class JobHistorySpout extends BaseRichSpout { configManager.getJobExtractorConfig(), configManager.getControlConfig(), callback, - zkState, jhfLCM, jobIdFilter, partitionId); @@ -168,7 +166,7 @@ public class JobHistorySpout extends BaseRichSpout { public void nextTuple() { try { Long modifiedTime = driver.crawl(); - zkState.updateProcessedTimeStamp(partitionId, modifiedTime); + JobHistoryZKStateManager.instance().updateProcessedTimeStamp(partitionId, modifiedTime); updateProcessedTimeStamp(modifiedTime); } catch (Exception ex) { LOG.error("fail crawling job history file and continue ...", ex); @@ -223,7 +221,7 @@ public class JobHistorySpout extends BaseRichSpout { //update latest process time long minTimeStamp = modifiedTime; for (int i = 1; i < numTotalPartitions; i++) { - long time = zkState.readProcessedTimeStamp(i); + long time = JobHistoryZKStateManager.instance().readProcessedTimeStamp(i); if (time <= minTimeStamp) { minTimeStamp = time; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java index c61d05a..2e64da3 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java @@ -47,6 +47,8 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { public static final int BACKOFF_DAYS = 0; + private static JobHistoryZKStateManager jobHistoryZKStateManager = new JobHistoryZKStateManager(); + private CuratorFramework newCurator(ZKStateConfig config) throws Exception { return CuratorFrameworkFactory.newClient( config.zkQuorum, @@ -56,7 +58,11 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { ); } - public JobHistoryZKStateManager(ZKStateConfig config) { + public static JobHistoryZKStateManager instance() { + return jobHistoryZKStateManager; + } + + public void init(ZKStateConfig config) { this.zkRoot = config.zkRoot; try {