OOZIE-2863 SLACalculatorMemory.loadOnRestart causing delay in server start (puru via satishsaley)
(cherry picked from commit 8df784b5fb15273af31d8c104f0f7c47c78e82a3) core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f553c4e5 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f553c4e5 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f553c4e5 Branch: refs/heads/branch-4.3 Commit: f553c4e565b935d827fe172dba4a29a1499de1ae Parents: 4b3b30d Author: satishsaley <satishsa...@apache.org> Authored: Wed Apr 19 23:11:29 2017 -0700 Committer: satishsaley <satishsa...@apache.org> Committed: Fri Dec 8 16:34:55 2017 -0800 ---------------------------------------------------------------------- .../org/apache/oozie/sla/SLACalcStatus.java | 65 ++++--- .../apache/oozie/sla/SLACalculatorMemory.java | 192 ++++++++++--------- .../apache/oozie/sla/service/SLAService.java | 5 + .../apache/oozie/service/TestHASLAService.java | 3 + .../oozie/sla/TestSLACalculatorMemory.java | 22 ++- release-log.txt | 1 + 6 files changed, 166 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/f553c4e5/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java index 3a76dfe..7be16f0 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java @@ -36,6 +36,7 @@ public class SLACalcStatus extends SLAEvent { public static String SLA_ENTITYKEY_PREFIX = "sla-"; private SLARegistrationBean regBean; + private SLASummaryBean summary; private String jobStatus; private SLAStatus slaStatus; private EventStatus eventStatus; @@ -44,6 +45,7 @@ public class SLACalcStatus extends SLAEvent { private long actualDuration = -1; private Date lastModifiedTime; private byte eventProcessed; + private String jobId; private XLog LOG; @@ -54,27 +56,13 @@ public class SLACalcStatus extends SLAEvent { } public SLACalcStatus(SLASummaryBean summary, SLARegistrationBean regBean) { + this(summary); + updateSLARegistrationBean(regBean); + LOG = LogUtils.setLogPrefix(LOG, this); + } + + public SLACalcStatus(SLASummaryBean summary) { this(); - SLARegistrationBean reg = new SLARegistrationBean(); - reg.setNotificationMsg(regBean.getNotificationMsg()); - reg.setUpstreamApps(regBean.getUpstreamApps()); - reg.setAlertContact(regBean.getAlertContact()); - reg.setAlertEvents(regBean.getAlertEvents()); - reg.setJobData(regBean.getJobData()); - if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) { - reg.addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, - regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)); - } - reg.setId(summary.getId()); - reg.setAppType(summary.getAppType()); - reg.setUser(summary.getUser()); - reg.setAppName(summary.getAppName()); - reg.setParentId(summary.getParentId()); - reg.setNominalTime(summary.getNominalTime()); - reg.setExpectedStart(summary.getExpectedStart()); - reg.setExpectedEnd(summary.getExpectedEnd()); - reg.setExpectedDuration(summary.getExpectedDuration()); - setSLARegistrationBean(reg); setActualStart(summary.getActualStart()); setActualEnd(summary.getActualEnd()); setActualDuration(summary.getActualDuration()); @@ -83,7 +71,8 @@ public class SLACalcStatus extends SLAEvent { setEventStatus(summary.getEventStatus()); setLastModifiedTime(summary.getLastModifiedTime()); setEventProcessed(summary.getEventProcessed()); - LOG = LogUtils.setLogPrefix(LOG, this); + setId(summary.getId()); + this.summary = summary; } /** @@ -112,17 +101,24 @@ public class SLACalcStatus extends SLAEvent { return regBean; } + public SLASummaryBean getSLASummaryBean() { + return summary; + } + public void setSLARegistrationBean(SLARegistrationBean slaBean) { + if (slaBean != null) { + this.jobId = slaBean.getId(); + } this.regBean = slaBean; } @Override public String getId() { - return regBean.getId(); + return jobId; } public void setId(String id) { - regBean.setId(id); + this.jobId = id; } @Override @@ -288,4 +284,27 @@ public class SLACalcStatus extends SLAEvent { return SLA_ENTITYKEY_PREFIX + this.getId(); } + public void updateSLARegistrationBean(SLARegistrationBean slaBean) { + SLARegistrationBean reg = new SLARegistrationBean(); + reg.setNotificationMsg(slaBean.getNotificationMsg()); + reg.setUpstreamApps(slaBean.getUpstreamApps()); + reg.setAlertContact(slaBean.getAlertContact()); + reg.setAlertEvents(slaBean.getAlertEvents()); + reg.setJobData(slaBean.getJobData()); + if (slaBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) { + reg.addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, + slaBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)); + } + reg.setId(summary.getId()); + reg.setAppType(summary.getAppType()); + reg.setUser(summary.getUser()); + reg.setAppName(summary.getAppName()); + reg.setParentId(summary.getParentId()); + reg.setNominalTime(summary.getNominalTime()); + reg.setExpectedStart(summary.getExpectedStart()); + reg.setExpectedEnd(summary.getExpectedEnd()); + reg.setExpectedDuration(summary.getExpectedDuration()); + setSLARegistrationBean(reg); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/f553c4e5/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java index 3522ffe..7fb3f0e 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java @@ -134,42 +134,14 @@ public class SLACalculatorMemory implements SLACalculator { } private void loadOnRestart() { - long slaPendingCount = 0; - long statusPendingCount = 0; - try { - List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor( - modifiedAfter)); + List<SLASummaryBean> summaryBeans = jpaService + .execute(new SLASummaryGetRecordsOnRestartJPAExecutor(modifiedAfter)); for (SLASummaryBean summaryBean : summaryBeans) { String jobId = summaryBean.getId(); - - SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( - SLARegQuery.GET_SLA_REG_ON_RESTART, jobId); - SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean); - - // Processed missed jobs - try { - SLAXCommandFactory.getSLAEventXCommand(slaCalcStatus).call(); - } - catch (Throwable e) { - LOG.error("Error while updating job {0}", slaCalcStatus.getId(), e); - } - - if (slaCalcStatus.getEventProcessed() == 7) { - historySet.add(jobId); - statusPendingCount++; - LOG.debug("Adding job [{0}] to historySet. EventProcessed is [{1}]", slaCalcStatus, - slaCalcStatus); - } - else if (slaCalcStatus.getEventProcessed() < 7) { - slaMap.put(jobId, slaCalcStatus); - slaPendingCount++; - LOG.debug("Adding job [{0}] to slamap. EventProcessed is [{1}]", slaCalcStatus, - slaCalcStatus); - - } + slaMap.put(jobId, new SLACalcStatus(summaryBean)); } - LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount); + LOG.info("Loaded {0} SLASummary object after restart", slaMap.size()); } catch (Exception e) { LOG.warn("Failed to retrieve SLASummary records on restart", e); @@ -198,13 +170,24 @@ public class SLACalculatorMemory implements SLACalculator { return memObj; } - private SLACalcStatus getSLACalcStatus(String jobId) throws JPAExecutorException { + /** + * Get SLACalcStatus from map if SLARegistration is not null, else create a new SLACalcStatus + * This function deosn't update slaMap + * @param jobId + * @return + * @throws JPAExecutorException + */ + private SLACalcStatus getOrCreateSLACalcStatus(String jobId) throws JPAExecutorException { SLACalcStatus memObj; memObj = slaMap.get(jobId); - if (memObj == null) { - memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance() - .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get( - SLARegQuery.GET_SLA_REG_ON_RESTART, jobId)); + // if the request came from immediately after restart don't use map SLACalcStatus. + if (memObj == null || memObj.getSLARegistrationBean() == null) { + SLARegistrationBean registrationBean = SLARegistrationQueryExecutor.getInstance() + .get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId); + SLASummaryBean summaryBean = memObj == null + ? SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId) + : memObj.getSLASummaryBean(); + return new SLACalcStatus(summaryBean, registrationBean); } return memObj; } @@ -235,52 +218,55 @@ public class SLACalculatorMemory implements SLACalculator { // job might be processed and removed from map by addJobStatus return; } - synchronized (slaCalc) { - // get eventProcessed on DB for validation in HA - SLASummaryBean summaryBean = null; - try { - summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get( - SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); + boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc); + // get eventProcessed on DB for validation in HA + SLASummaryBean summaryBean = null; + try { + summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()) + .get(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); + } + catch (JPAExecutorException e) { + if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { + LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId); + slaMap.remove(jobId); + return; } - catch (JPAExecutorException e) { - if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { - LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId); - slaMap.remove(jobId); - return; - } - throw e; + throw e; + } + byte eventProc = summaryBean.getEventProcessed(); + slaCalc.setEventProcessed(eventProc); + if (eventProc >= 7) { + if (eventProc == 7) { + historySet.add(jobId); } - byte eventProc = summaryBean.getEventProcessed(); - slaCalc.setEventProcessed(eventProc); - if (eventProc >= 7) { - if (eventProc == 7) { - historySet.add(jobId); - } - slaMap.remove(jobId); - LOG.trace("Removed Job [{0}] from map as SLA processed", jobId); + slaMap.remove(jobId); + LOG.trace("Removed Job [{0}] from map as SLA processed", jobId); + } + else { + if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { + // Update last modified time. + slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); + reloadExpectedTimeAndConfig(slaCalc); + LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); } - else { - if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { - // Update last modified time. - slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); - reloadExpectedTimeAndConfig(slaCalc); - LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); + if (firstCheckAfterRetstart || isChanged(slaCalc)) { + LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(), + slaCalc.getEventProcessed(), slaCalc.getJobStatus()); + try { + SLAXCommandFactory.getSLAEventXCommand(slaCalc).call(); + checkEventProc(slaCalc); } - if (isChanged(slaCalc)) { - LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(), - slaCalc.getEventProcessed(), slaCalc.getJobStatus()); - try { - SLAXCommandFactory.getSLAEventXCommand(slaCalc).call(); - checkEventProc(slaCalc); + catch (XException e) { + if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { + LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId()); + slaMap.remove(jobId); } - catch (XException e) { - if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { - LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId()); - slaMap.remove(jobId); + else { + if (firstCheckAfterRetstart) { + slaCalc.setSLARegistrationBean(null); } } } - } } } @@ -473,7 +459,7 @@ public class SLACalculatorMemory implements SLACalculator { "Received addJobStatus request for job [{0}] jobStatus = [{1}], jobEventStatus = [{2}], startTime = [{3}], " + "endTime = [{4}] ", jobId, jobStatus, jobEventStatus, startTime, endTime); SLACalcStatus slaCalc = slaMap.get(jobId); - + boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc); if (slaCalc == null) { SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( SLARegQuery.GET_SLA_REG_ALL, jobId); @@ -505,6 +491,9 @@ public class SLACalculatorMemory implements SLACalculator { checkEventProc(slaCalc); } catch (XException e) { + if (firstCheckAfterRetstart) { + slaCalc.setSLARegistrationBean(null); + } LOG.error(e); throw new ServiceException(e); } @@ -570,12 +559,10 @@ public class SLACalculatorMemory implements SLACalculator { @SuppressWarnings("rawtypes") List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); for (String jobId : jobIds) { - SLACalcStatus slaCalc = getSLACalcStatus(jobId); - if (slaCalc != null) { - slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT); - updateDBSlaConfig(slaCalc, updateList); - isJobFound = true; - } + SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId); + slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT); + updateDBSlaConfig(slaCalc, updateList); + isJobFound = true; } executeBatchQuery(updateList); return isJobFound; @@ -593,13 +580,10 @@ public class SLACalculatorMemory implements SLACalculator { List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); for (String jobId : jobIds) { - SLACalcStatus slaCalc = getSLACalcStatus(jobId); - if (slaCalc != null) { - slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, - Boolean.toString(true)); - updateDBSlaConfig(slaCalc, updateList); - isJobFound = true; - } + SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId); + slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(true)); + updateDBSlaConfig(slaCalc, updateList); + isJobFound = true; } executeBatchQuery(updateList); return isJobFound; @@ -617,12 +601,10 @@ public class SLACalculatorMemory implements SLACalculator { @SuppressWarnings("rawtypes") List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) { - SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist()); - if (slaCalc != null) { - updateParams(slaCalc, jobIdSLAPair.getSecond()); - updateDBSlaExpectedValues(slaCalc, updateList); - isJobFound = true; - } + SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobIdSLAPair.getFist()); + updateParams(slaCalc, jobIdSLAPair.getSecond()); + updateDBSlaExpectedValues(slaCalc, updateList); + isJobFound = true; } executeBatchQuery(updateList); return isJobFound; @@ -655,4 +637,24 @@ public class SLACalculatorMemory implements SLACalculator { return childJobIds; } + private boolean checkAndUpdateSLACalcAfterRestart(SLACalcStatus slaCalc) throws JPAExecutorException { + if (slaCalc != null && slaCalc.getSLARegistrationBean() == null) { + return updateSLARegistartion(slaCalc); + } + return false; + } + + public boolean updateSLARegistartion(SLACalcStatus slaCalc) throws JPAExecutorException { + if (slaCalc.getSLARegistrationBean() == null) { + synchronized (slaCalc) { + if (slaCalc.getSLARegistrationBean() == null) { + SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance() + .get(SLARegQuery.GET_SLA_REG_ON_RESTART, slaCalc.getId()); + slaCalc.updateSLARegistrationBean(slaRegBean); + return true; + } + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/f553c4e5/core/src/main/java/org/apache/oozie/sla/service/SLAService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java index 08cd07e..2d23a22 100644 --- a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java +++ b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java @@ -114,6 +114,11 @@ public class SLAService implements Service { new SLAWorker(calcImpl).run(); } + @VisibleForTesting + public void startSLAWorker() { + new Thread(new SLAWorker(calcImpl)).start(); + } + private class SLAWorker implements Runnable { SLACalculator calc; http://git-wip-us.apache.org/repos/asf/oozie/blob/f553c4e5/core/src/test/java/org/apache/oozie/service/TestHASLAService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java index 3af263e..67fa56e 100644 --- a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java +++ b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java @@ -207,6 +207,7 @@ public class TestHASLAService extends ZKXTestCase { SLAService slas = Services.get().get(SLAService.class); SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator(); slaCalcMem.init(Services.get().getConf()); + slaCalcMem.updateAllSlaStatus(); List<String> slaMapKeys = new ArrayList<String>(); Iterator<String> itr = slaCalcMem.iterator(); while (itr.hasNext()) { @@ -226,6 +227,7 @@ public class TestHASLAService extends ZKXTestCase { dummySlaCalcMem.setEventHandlerService(dummyEhs); dummyEhs.init(Services.get()); dummySlaCalcMem.init(Services.get().getConf()); + dummySlaCalcMem.updateAllSlaStatus(); slaMapKeys = new ArrayList<String>(); itr = dummySlaCalcMem.iterator(); while (itr.hasNext()) { @@ -402,6 +404,7 @@ public class TestHASLAService extends ZKXTestCase { SLAService slas = Services.get().get(SLAService.class); SLACalculatorMemory slaCalcMem1 = (SLACalculatorMemory) slas.getSLACalculator(); slaCalcMem1.init(Services.get().get(ConfigurationService.class).getConf()); + slaCalcMem1.updateAllSlaStatus(); List<String> idList = new ArrayList<String>(); idList.add(id); slaCalcMem1.disableAlert(idList); http://git-wip-us.apache.org/repos/asf/oozie/blob/f553c4e5/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java index 559e2b3..34011f6 100644 --- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java +++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java @@ -144,6 +144,7 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaCalcMemory.addRegistration(jobId2, slaRegBean2); slaCalcMemory.addRegistration(jobId3, slaRegBean3); + slaCalcMemory.updateAllSlaStatus(); SLACalcStatus calc1 = slaCalcMemory.get(jobId1); SLACalcStatus calc2 = slaCalcMemory.get(jobId2); SLACalcStatus calc3 = slaCalcMemory.get(jobId3); @@ -190,7 +191,7 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); - + slaCalcMemory.updateAllSlaStatus(); SLACalcStatus calc = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get( SLASummaryQuery.GET_SLA_SUMMARY, jobId1), SLARegistrationQueryExecutor.getInstance().get( @@ -250,7 +251,7 @@ public class TestSLACalculatorMemory extends XDataTestCase { SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB); - String jobId1 = slaRegBean1.getId(); + final String jobId1 = slaRegBean1.getId(); slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07")); slaRegBean1.setExpectedStart(sdf.parse("2012-03-07")); slaCalcMemory.addRegistration(jobId1, slaRegBean1); @@ -276,6 +277,8 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + slaCalcMemory.updateAllSlaStatus(); + slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1); assertEquals("job-1-W", slaSummary.getId()); assertEquals(8, slaSummary.getEventProcessed()); @@ -302,6 +305,7 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + slaCalcMemory.updateAllSlaStatus(); slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1); assertEquals("FAILED", slaSummary.getJobStatus()); @@ -323,8 +327,16 @@ public class TestSLACalculatorMemory extends XDataTestCase { SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummaryBean); - slaCalcMemory = new SLACalculatorMemory(); - slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + SLAService slaService = Services.get().get(SLAService.class); + slaService.startSLAWorker(); + slaService.addStatusEvent(jobId1, "RUNNING", null, null, null); + waitFor(60 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1) + .getEventProcessed() == 7; + } + }); + slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, slaSummaryBean.getId()); //since job is already running and it's a old job assertEquals(7, slaSummary.getEventProcessed()); @@ -484,6 +496,8 @@ public class TestSLACalculatorMemory extends XDataTestCase { CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction2); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + slaCalcMemory.updateAllSlaStatus(); + slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1); slaSummary2 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId2); http://git-wip-us.apache.org/repos/asf/oozie/blob/f553c4e5/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 0649b46..2b36ec0 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.1 release +OOZIE-2863 SLACalculatorMemory.loadOnRestart causing delay in server start (puru via satishsaley) OOZIE-2862 Coord change command doesn't change job to running if job was killed without creating any actions (puru) OOZIE-2811 amend Add support for filtering out properties from SparkConfigurationService OOZIE-2807 Oozie gets RM delegation token even for checking job status (satishsaley)