Repository: lens Updated Branches: refs/heads/master 9d21940be -> 3ab732acc
LENS-1286: Handle Restart cases for Scheduler Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/3ab732ac Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/3ab732ac Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/3ab732ac Branch: refs/heads/master Commit: 3ab732accdfb19b87da0bf5c72e301b0c2a42b84 Parents: 9d21940 Author: Lavkesh Lahngir <lavk...@linux.com> Authored: Wed Sep 14 17:02:00 2016 +0530 Committer: Rajat Khandelwal <rajatgupt...@gmail.com> Committed: Wed Sep 14 17:02:00 2016 +0530 ---------------------------------------------------------------------- .../lens/server/scheduler/SchedulerDAO.java | 180 +++++++++++-------- .../scheduler/SchedulerQueryEventListener.java | 10 +- .../server/scheduler/SchedulerServiceImpl.java | 139 ++++++++++++-- .../server/scheduler/SchedulerRestartTest.java | 124 +++++++++++++ .../scheduler/TestSchedulerServiceImpl.java | 43 +---- .../scheduler/util/SchedulerTestUtils.java | 46 +++++ 6 files changed, 408 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java index 966a64e..b924167 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java @@ -53,7 +53,7 @@ public class SchedulerDAO { this.conf = conf; try { Class dbStoreClass = Class - .forName(conf.get(LensConfConstants.SCHEDULER_STORE_CLASS, SchedulerHsqlDBStore.class.getName())); + .forName(conf.get(LensConfConstants.SCHEDULER_STORE_CLASS, SchedulerHsqlDBStore.class.getName())); this.store = (SchedulerDBStore) dbStoreClass.newInstance(); this.store.init(UtilityMethods.getDataSourceFromConfForScheduler(conf)); this.store.createJobTable(); @@ -81,7 +81,7 @@ public class SchedulerDAO { try { return store.insertIntoJobTable(jobInfo); } catch (SQLException e) { - log.error("Error while storing the jobInfo for " + jobInfo.getId().getHandleIdString(), e); + log.error("Error while storing the jobInfo for {}", jobInfo.getId().getHandleIdString(), e); return 0; } } @@ -96,7 +96,7 @@ public class SchedulerDAO { try { return store.getSchedulerJobInfo(id.getHandleIdString()); } catch (SQLException e) { - log.error("Error while getting the job detail for " + id.getHandleIdString(), e); + log.error("Error while getting the job detail for {}", id.getHandleIdString(), e); return null; } } @@ -111,7 +111,7 @@ public class SchedulerDAO { try { return store.getJob(id.getHandleIdString()); } catch (SQLException e) { - log.error("Error while getting the job for " + id.getHandleIdString(), e); + log.error("Error while getting the job for {}", id.getHandleIdString(), e); return null; } } @@ -126,7 +126,7 @@ public class SchedulerDAO { try { return store.getUser(id.getHandleIdString()); } catch (SQLException e) { - log.error("Error while getting the user for the job with handle " + id.getHandleIdString(), e); + log.error("Error while getting the user for the job with handle {}", id.getHandleIdString(), e); return null; } } @@ -141,7 +141,7 @@ public class SchedulerDAO { try { return store.getJobState(id.getHandleIdString()); } catch (SQLException e) { - log.error("Error while getting the job status for " + id.getHandleIdString(), e); + log.error("Error while getting the job status for {}", id.getHandleIdString(), e); return null; } } @@ -156,7 +156,7 @@ public class SchedulerDAO { try { return store.updateJob(info.getId().getHandleIdString(), info.getJob(), info.getModifiedOn()); } catch (SQLException e) { - log.error("Error while updating job for " + info.getId().getHandleIdString(), e); + log.error("Error while updating job for {}", info.getId().getHandleIdString(), e); return 0; } } @@ -171,7 +171,7 @@ public class SchedulerDAO { try { return store.updateJobStatus(info.getId().getHandleIdString(), info.getJobState().name(), info.getModifiedOn()); } catch (SQLException e) { - log.error("Error while updating job status for " + info.getId().getHandleIdString(), e); + log.error("Error while updating job status for {}", info.getId().getHandleIdString(), e); return 0; } } @@ -180,7 +180,7 @@ public class SchedulerDAO { try { return store.insertIntoJobInstanceTable(instanceInfo); } catch (SQLException e) { - log.error("Error while storing job instance for " + instanceInfo.getId()); + log.error("Error while storing job instance for {}", instanceInfo.getId(), e); return 0; } } @@ -189,9 +189,9 @@ public class SchedulerDAO { try { return store.insertIntoJobInstanceRunTable(instanceRun); } catch (SQLException e) { - log.error( - "Error while storing job instance run for " + instanceRun.getRunId() + " and instance handle " + instanceRun - .getHandle().getHandleIdString(), e); + log.error("Error while storing job instance run for {} with run id {} ", + instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId(), e); + return 0; } } @@ -206,7 +206,7 @@ public class SchedulerDAO { try { return store.getJobInstanceInfo(id.getHandleIdString()); } catch (SQLException e) { - log.error("Error while getting the job instance info for " + id.getHandleIdString(), e); + log.error("Error while getting the job instance info for {}", id.getHandleIdString(), e); return null; } } @@ -221,8 +221,8 @@ public class SchedulerDAO { try { return store.updateJobInstanceRun(instanceRun); } catch (SQLException e) { - log.error("Error while updating the job instance status for " + instanceRun.getHandle().getHandleIdString() - + " and run: " + instanceRun.getRunId(), e); + log.error("Error while updating the job instance status for {} and run id {}", + instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId(), e); return 0; } } @@ -238,7 +238,7 @@ public class SchedulerDAO { try { return store.getAllJobInstances(id.getHandleIdString()); } catch (SQLException e) { - log.error("Error while getting instances of a job with id " + id.getHandleIdString(), e); + log.error("Error while getting instances of a job with id {}" , id.getHandleIdString(), e); return null; } } @@ -276,6 +276,21 @@ public class SchedulerDAO { } } + /** + * Get all instances matching one of the states + * + * @param states States to be consider for filter + * @return A list of SchedulerJobInstanceRun + */ + public List<SchedulerJobInstanceRun> getInstanceRuns(SchedulerJobInstanceState... states) { + try { + return store.getInstanceRuns(states); + } catch (SQLException e) { + log.error("Error while getting jobs ", e); + return null; + } + } + public abstract static class SchedulerDBStore { protected static final String JOB_TABLE = "job_table"; protected static final String JOB_INSTANCE_TABLE = "job_instance_table"; @@ -356,7 +371,7 @@ public class SchedulerDAO { String insertSQL = "INSERT INTO " + JOB_TABLE + " VALUES(?,?,?,?,?,?,?)"; JAXBElement<XJob> xmlJob = jobFactory.createJob(jobInfo.getJob()); return runner.update(insertSQL, jobInfo.getId().toString(), ToXMLString.toString(xmlJob), jobInfo.getUserName(), - jobInfo.getJobState().name(), jobInfo.getCreatedOn(), jobInfo.getModifiedOn(), jobInfo.getJob().getName()); + jobInfo.getJobState().name(), jobInfo.getCreatedOn(), jobInfo.getModifiedOn(), jobInfo.getJob().getName()); } /** @@ -369,17 +384,17 @@ public class SchedulerDAO { public int insertIntoJobInstanceTable(SchedulerJobInstanceInfo instanceInfo) throws SQLException { String insertSQL = "INSERT INTO " + JOB_INSTANCE_TABLE + " VALUES(?,?,?)"; return runner - .update(insertSQL, instanceInfo.getId().getHandleIdString(), instanceInfo.getJobId().getHandleIdString(), - instanceInfo.getScheduleTime()); + .update(insertSQL, instanceInfo.getId().getHandleIdString(), instanceInfo.getJobId().getHandleIdString(), + instanceInfo.getScheduleTime()); } public int insertIntoJobInstanceRunTable(SchedulerJobInstanceRun instanceRun) throws SQLException { String insetSQL = "INSERT INTO " + JOB_INSTANCE_RUN_TABLE + " VALUES(?,?,?,?,?,?,?,?)"; return runner.update(insetSQL, instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId(), - instanceRun.getSessionHandle() == null ? "" : instanceRun.getSessionHandle().toString(), - instanceRun.getStartTime(), instanceRun.getEndTime(), instanceRun.getResultPath(), - instanceRun.getQueryHandle() == null ? "" : instanceRun.getQueryHandle().getHandleIdString(), - instanceRun.getInstanceState().name()); + instanceRun.getSessionHandle() == null ? "" : instanceRun.getSessionHandle().toString(), + instanceRun.getStartTime(), instanceRun.getEndTime(), instanceRun.getResultPath(), + instanceRun.getQueryHandle() == null ? "" : instanceRun.getQueryHandle().getHandleIdString(), + instanceRun.getInstanceState().name()); } /** @@ -505,8 +520,8 @@ public class SchedulerDAO { */ public int updateJob(String id, XJob job, long modifiedOn) throws SQLException { String updateSQL = - "UPDATE " + JOB_TABLE + " SET " + COLUMN_JOB + "=?, " + COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID - + "=?"; + "UPDATE " + JOB_TABLE + " SET " + COLUMN_JOB + "=?, " + COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID + + "=?"; JAXBElement<XJob> xmlJob = jobFactory.createJob(job); return runner.update(updateSQL, ToXMLString.toString(xmlJob), modifiedOn, id); } @@ -522,8 +537,8 @@ public class SchedulerDAO { */ public int updateJobStatus(String id, String status, long modifiedOn) throws SQLException { String updateSQL = - "UPDATE " + JOB_TABLE + " SET " + COLUMN_STATUS + "=?, " + COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID - + "=?"; + "UPDATE " + JOB_TABLE + " SET " + COLUMN_STATUS + "=?, " + COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID + + "=?"; return runner.update(updateSQL, status, modifiedOn, id); } @@ -552,24 +567,7 @@ public class SchedulerDAO { // Get the Runs String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_RUN_TABLE + " WHERE " + COLUMN_ID + "=?"; List<Object[]> instanceRuns = runner.query(fetchSQL, multipleRowsHandler, (String) instanceInfo[0]); - List<SchedulerJobInstanceRun> runList = new ArrayList<>(); - for (Object[] run : instanceRuns) { - // run[0] will contain the instanceID - int runId = (Integer) run[1]; - LensSessionHandle sessionHandle = LensSessionHandle.valueOf((String) run[2]); - long starttime = (Long) run[3]; - long endtime = (Long) run[4]; - String resultPath = (String) run[5]; - String queryHandleString = (String) run[6]; - QueryHandle queryHandle = null; - if (!queryHandleString.isEmpty()) { - queryHandle = QueryHandle.fromString((String) run[6]); - } - SchedulerJobInstanceState instanceStatus = SchedulerJobInstanceState.valueOf((String) run[7]); - SchedulerJobInstanceRun instanceRun = new SchedulerJobInstanceRun(id, runId, sessionHandle, starttime, endtime, - resultPath, queryHandle, instanceStatus); - runList.add(instanceRun); - } + List<SchedulerJobInstanceRun> runList = processInstanceRun(instanceRuns); return new SchedulerJobInstanceInfo(id, jobId, createdOn, runList); } @@ -582,13 +580,13 @@ public class SchedulerDAO { */ public int updateJobInstanceRun(SchedulerJobInstanceRun instanceRun) throws SQLException { String updateSQL = - "UPDATE " + JOB_INSTANCE_RUN_TABLE + " SET " + COLUMN_END_TIME + "=?, " + COLUMN_RESULT_PATH + "=?, " - + COLUMN_QUERY_HANDLE + "=?, " + COLUMN_STATUS + "=?" + " WHERE " + COLUMN_ID + "=? AND " + COLUMN_RUN_ID - + "=?"; + "UPDATE " + JOB_INSTANCE_RUN_TABLE + " SET " + COLUMN_END_TIME + "=?, " + COLUMN_RESULT_PATH + "=?, " + + COLUMN_QUERY_HANDLE + "=?, " + COLUMN_STATUS + "=?" + " WHERE " + COLUMN_ID + "=? AND " + COLUMN_RUN_ID + + "=?"; return runner.update(updateSQL, instanceRun.getEndTime(), instanceRun.getResultPath(), - instanceRun.getQueryHandle() == null ? "" : instanceRun.getQueryHandle().getHandleIdString(), - instanceRun.getInstanceState().name(), instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId()); + instanceRun.getQueryHandle() == null ? "" : instanceRun.getQueryHandle().getHandleIdString(), + instanceRun.getInstanceState().name(), instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId()); } /** @@ -625,6 +623,39 @@ public class SchedulerDAO { return (String) result.get(0)[0]; } } + + private List<SchedulerJobInstanceRun> processInstanceRun(List<Object[]> instanceRuns) throws SQLException { + List<SchedulerJobInstanceRun> runList = new ArrayList<>(); + for (Object[] run : instanceRuns) { + // run[0] will contain the instanceID + SchedulerJobInstanceHandle id = SchedulerJobInstanceHandle.fromString((String) run[0]); + int runId = (Integer) run[1]; + LensSessionHandle sessionHandle = LensSessionHandle.valueOf((String) run[2]); + long starttime = (Long) run[3]; + long endtime = (Long) run[4]; + String resultPath = (String) run[5]; + String queryHandleString = (String) run[6]; + QueryHandle queryHandle = null; + if (!queryHandleString.isEmpty()) { + queryHandle = QueryHandle.fromString((String) run[6]); + } + SchedulerJobInstanceState instanceStatus = SchedulerJobInstanceState.valueOf((String) run[7]); + SchedulerJobInstanceRun instanceRun = new SchedulerJobInstanceRun(id, runId, sessionHandle, starttime, endtime, + resultPath, queryHandle, instanceStatus); + runList.add(instanceRun); + } + return runList; + } + + public List<SchedulerJobInstanceRun> getInstanceRuns(SchedulerJobInstanceState[] states) throws SQLException { + String whereClause = ""; + for (SchedulerJobInstanceState state : states) { + whereClause += ((whereClause.isEmpty()) ? " WHERE " : " OR ") + COLUMN_STATUS + " = '" + state + "'"; + } + String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_RUN_TABLE + whereClause; + List<Object[]> instanceRuns = runner.query(fetchSQL, multipleRowsHandler); + return processInstanceRun(instanceRuns); + } } /** @@ -637,10 +668,10 @@ public class SchedulerDAO { @Override public void createJobTable() throws SQLException { String createSQL = - "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL," + COLUMN_JOB - + " TEXT," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS + " VARCHAR(20)," + COLUMN_CREATED_ON - + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( " - + COLUMN_ID + ")" + ")"; + "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL," + COLUMN_JOB + " TEXT," + + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS + " VARCHAR(20)," + COLUMN_CREATED_ON + " BIGINT, " + + COLUMN_MODIFIED_ON + " BIGINT, " + COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( " + COLUMN_ID + ")" + + ")"; runner.update(createSQL); } @@ -650,9 +681,9 @@ public class SchedulerDAO { @Override public void createJobInstanceTable() throws SQLException { String createSQL = - "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " - + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( " - + COLUMN_ID + ")" + ")"; + "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " + + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( " + + COLUMN_ID + ")" + ")"; runner.update(createSQL); } @@ -662,12 +693,11 @@ public class SchedulerDAO { @Override public void createJobInstanceRunTable() throws SQLException { String createSQL = - "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " - + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME - + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " TEXT, " + COLUMN_QUERY_HANDLE - + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + COLUMN_ID + ", " - + COLUMN_RUN_ID - + ")" + ")"; + "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " + + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME + + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " TEXT, " + COLUMN_QUERY_HANDLE + + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + COLUMN_ID + ", " + COLUMN_RUN_ID + + ")" + ")"; runner.update(createSQL); } } @@ -682,10 +712,10 @@ public class SchedulerDAO { @Override public void createJobTable() throws SQLException { String createSQL = - "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL," + COLUMN_JOB - + " VARCHAR(1024)," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS + " VARCHAR(20)," + COLUMN_CREATED_ON - + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( " - + COLUMN_ID + ")" + ")"; + "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL," + COLUMN_JOB + + " VARCHAR(1024)," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS + " VARCHAR(20)," + COLUMN_CREATED_ON + + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( " + + COLUMN_ID + ")" + ")"; runner.update(createSQL); } @@ -695,9 +725,9 @@ public class SchedulerDAO { @Override public void createJobInstanceTable() throws SQLException { String createSQL = - "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " - + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( " - + COLUMN_ID + ")" + ")"; + "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " + + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( " + + COLUMN_ID + ")" + ")"; runner.update(createSQL); } @@ -707,11 +737,11 @@ public class SchedulerDAO { @Override public void createJobInstanceRunTable() throws SQLException { String createSQL = - "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " - + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME - + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " VARCHAR(1024)," - + COLUMN_QUERY_HANDLE + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " - + COLUMN_ID + ", " + COLUMN_RUN_ID + " )" + ")"; + "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " + + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME + + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " VARCHAR(1024)," + COLUMN_QUERY_HANDLE + + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + COLUMN_ID + ", " + COLUMN_RUN_ID + + " )" + ")"; runner.update(createSQL); } } http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java index 077d531..4192134 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java @@ -79,9 +79,13 @@ public class SchedulerQueryEventListener extends AsyncEventListener<QueryEnded> latestRun.setEndTime(System.currentTimeMillis()); latestRun.setInstanceState(state); latestRun.setResultPath(queryContext.getResultSetPath()); - schedulerDAO.updateJobInstanceRun(latestRun); - log.info("Updated instance run {} for instance {} for job {} to {}", latestRun.getRunId(), info.getId(), - info.getJobId(), state); + if (schedulerDAO.updateJobInstanceRun(latestRun) == 1) { + log.info("Updated instance run {} for instance {} for job {} to {}", latestRun.getRunId(), info.getId(), + info.getJobId(), state); + } else { + log.error("Failed to update instance run {} for instance {} for job {} to {}", latestRun.getRunId(), + info.getId(), info.getJobId(), state); + } } catch (InvalidStateTransitionException e) { log.error("Instance Transition Failed ", e); } http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java index 9cee0c2..969d740 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java @@ -29,7 +29,9 @@ import org.apache.lens.api.LensConf; import org.apache.lens.api.LensSessionHandle; import org.apache.lens.api.error.InvalidStateTransitionException; import org.apache.lens.api.error.LensCommonErrorCode; +import org.apache.lens.api.query.LensQuery; import org.apache.lens.api.query.QueryHandle; +import org.apache.lens.api.query.QueryStatus; import org.apache.lens.api.scheduler.*; import org.apache.lens.cube.parse.CubeQueryConfUtil; import org.apache.lens.server.BaseLensService; @@ -86,13 +88,13 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe super(NAME, cliService); } + @Override public synchronized void init(HiveConf hiveConf) { super.init(hiveConf); try { schedulerDAO = new SchedulerDAO(hiveConf); alarmService = LensServices.get().getService(AlarmService.NAME); queryService = LensServices.get().getService(QueryExecutionService.NAME); - // Get the listeners' classes from the configuration. this.schedulerEventListener = new SchedulerEventListener(schedulerDAO); this.schedulerQueryEventListener = new SchedulerQueryEventListener(schedulerDAO); getEventService().addListenerForType(schedulerEventListener, SchedulerAlarmEvent.class); @@ -105,14 +107,112 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe private void doesSessionBelongToUser(LensSessionHandle sessionHandle, String user) throws LensException { LensSessionImpl session = getSession(sessionHandle); if (!session.getLoggedInUser().equals(user)) { + log.warn("Session User {} is not equal to Job owner {}", session.getLoggedInUser(), user); throw new LensException(LensSchedulerErrorCode.CURRENT_USER_IS_NOT_SAME_AS_OWNER.getLensErrorInfo(), null, session.getLoggedInUser(), sessionHandle.getPublicId().toString(), user); } } + /** + * How the restarts are handled? + * Get all the instances with state Running or New. + * If They are running then check the query status. If Query is finished, take query parameters and update the + * instance. If query is still running then do nothing. + * If the state is New then Kill the instance and rerun it. + */ @Override public synchronized void start() { super.start(); + List<SchedulerJobInstanceRun> instanceRuns = schedulerDAO + .getInstanceRuns(SchedulerJobInstanceState.WAITING, SchedulerJobInstanceState.LAUNCHED, + SchedulerJobInstanceState.RUNNING); + for (SchedulerJobInstanceRun run : instanceRuns) { + LensSessionHandle sessionHandle = null; + try { + SchedulerJobInstanceInfo instanceInfo = schedulerDAO.getSchedulerJobInstanceInfo(run.getHandle()); + log.info("Recovering instance {} of job {} ", instanceInfo.getId(), instanceInfo.getJobId()); + switch (run.getInstanceState()) { + case WAITING: + case LAUNCHED: + // Kill and rerun + if (updateInstanceRun(run, SchedulerJobInstanceState.KILLED)) { + notifyRerun(instanceInfo); + log.info("Re-running instance {} of job {}", instanceInfo.getId(), instanceInfo.getJobId()); + } else { + log.error("Not able to recover instance {} of job {}", instanceInfo.getId(), instanceInfo.getJobId()); + } + break; + case RUNNING: + sessionHandle = openSessionAsUser(schedulerDAO.getUser(instanceInfo.getJobId())); + if (!checkQueryState(sessionHandle, run)) { + log.info("Re-running instance {} of job {}", instanceInfo.getId(), instanceInfo.getJobId()); + notifyRerun(instanceInfo); + } + break; + } + } catch (LensException e) { + log.error("Not able to recover instance {} ", run.getHandle().getHandleIdString(), e); + } finally { + try { + if (sessionHandle != null) { + closeSession(sessionHandle); + } + } catch (Exception e) { + log.error("Error closing session ", e); + } + } + } + } + + /** + * If query is not found of is invalid then rerun again else get the status and update correspondingly. + * + * @param sessionHandle + * @param run + * @return + * @throws LensException + */ + private boolean checkQueryState(LensSessionHandle sessionHandle, SchedulerJobInstanceRun run) throws LensException { + QueryHandle queryHandle = run.getQueryHandle(); + LensQuery query = null; + try { + query = this.queryService.getQuery(sessionHandle, queryHandle); + } catch (Exception e) { + updateInstanceRun(run, SchedulerJobInstanceState.KILLED); + return false; + } + if (query == null) { + // This means we have no idea what happened to query + // Mark it as Killed. + updateInstanceRun(run, SchedulerJobInstanceState.KILLED); + return false; + } + QueryStatus.Status status = query.getStatus().getStatus(); + SchedulerJobInstanceState state = run.getInstanceState(); + switch (status) { + case NEW: + case QUEUED: + case LAUNCHED: + case RUNNING: + case EXECUTED: + break; + case CANCELED: + state = SchedulerJobInstanceState.KILLED; + break; + case SUCCESSFUL: + state = SchedulerJobInstanceState.SUCCEEDED; + break; + case FAILED: + state = SchedulerJobInstanceState.FAILED; + break; + default: + // This should not happen + log.warn("Unexpected status {} for the query id {}", status, queryHandle); + state = SchedulerJobInstanceState.KILLED; + } + run.setResultPath(query.getResultSetPath()); + updateInstanceRun(run, state); + return true; } /** @@ -150,9 +250,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe validateJob(job); SchedulerJobHandle handle = UtilityMethods.generateSchedulerJobHandle(); long createdOn = System.currentTimeMillis(); - long modifiedOn = createdOn; SchedulerJobInfo info = new SchedulerJobInfo(handle, job, session.getLoggedInUser(), SchedulerJobState.NEW, - createdOn, modifiedOn); + createdOn, createdOn); if (schedulerDAO.storeJob(info) == 1) { log.info("Successfully submitted job with handle {}", handle); return handle; @@ -335,9 +434,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1); try { latestRun.getInstanceState().nextTransition(ON_RERUN); - getEventService().notifyEvent( - new SchedulerAlarmEvent(instanceInfo.getJobId(), new DateTime(instanceInfo.getScheduleTime()), - SchedulerAlarmEvent.EventType.SCHEDULE, instanceHandle)); + notifyRerun(instanceInfo); log.info("Rerunning the instance with {} for job {} ", instanceHandle, instanceInfo.getJobId()); } catch (InvalidStateTransitionException e) { throw new LensException(LensSchedulerErrorCode.INVALID_EVENT_FOR_JOB_INSTANCE.getLensErrorInfo(), e, @@ -379,16 +476,22 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe } QueryHandle handle = latestRun.getQueryHandle(); if (handle == null || handle.getHandleIdString().isEmpty()) { - latestRun.setEndTime(System.currentTimeMillis()); - latestRun.setInstanceState(state); - schedulerDAO.updateJobInstanceRun(latestRun); - log.info("Killing instance with {} for job {} ", instanceHandle, instanceInfo.getJobId()); - return true; + log.info("Killing instance {} for job {} ", instanceInfo.getId(), instanceInfo.getJobId()); + return updateInstanceRun(latestRun, state); + } else { + log.info("Killing instance {} for job {} with query handle {} ", instanceInfo.getId(), + instanceInfo.getJobId(), handle); + // This will cause the QueryEnd event which will set the status of the instance to KILLED. + return queryService.cancelQuery(sessionHandle, handle); } - log.info("Killing instance with {} for job {} with query handle {} ", instanceHandle, instanceInfo.getJobId(), - handle); - // This will cause the QueryEnd event which will set the status of the instance to KILLED. - return queryService.cancelQuery(sessionHandle, handle); + + } + + private boolean updateInstanceRun(SchedulerJobInstanceRun latestRun, SchedulerJobInstanceState state) + throws LensException { + latestRun.setEndTime(System.currentTimeMillis()); + latestRun.setInstanceState(state); + return schedulerDAO.updateJobInstanceRun(latestRun) == 1; } /** @@ -418,4 +521,10 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe currentState.name(), info.getId().getHandleIdString()); } } + + private void notifyRerun(SchedulerJobInstanceInfo instanceInfo) throws LensException { + getEventService().notifyEvent( + new SchedulerAlarmEvent(instanceInfo.getJobId(), new DateTime(instanceInfo.getScheduleTime()), + SchedulerAlarmEvent.EventType.SCHEDULE, instanceInfo.getId())); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/test/java/org/apache/lens/server/scheduler/SchedulerRestartTest.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/scheduler/SchedulerRestartTest.java b/lens-server/src/test/java/org/apache/lens/server/scheduler/SchedulerRestartTest.java new file mode 100644 index 0000000..de36499 --- /dev/null +++ b/lens-server/src/test/java/org/apache/lens/server/scheduler/SchedulerRestartTest.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.scheduler; + +import static org.apache.lens.server.scheduler.util.SchedulerTestUtils.getTestJob; +import static org.apache.lens.server.scheduler.util.SchedulerTestUtils.setupQueryService; + +import static org.mockito.Matchers.any; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.lens.api.LensSessionHandle; +import org.apache.lens.api.query.LensQuery; +import org.apache.lens.api.query.QueryHandle; +import org.apache.lens.api.query.QueryStatus; +import org.apache.lens.api.scheduler.*; +import org.apache.lens.server.LensServerConf; +import org.apache.lens.server.LensServices; +import org.apache.lens.server.api.LensConfConstants; +import org.apache.lens.server.api.metrics.LensMetricsUtil; +import org.apache.lens.server.api.scheduler.SchedulerService; +import org.apache.lens.server.model.LogSegregationContext; +import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext; +import org.apache.lens.server.util.UtilityMethods; + +import org.joda.time.DateTime; +import org.powermock.api.mockito.PowerMockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "scheduler-restart", dependsOnGroups = "duplicate-query") +public class SchedulerRestartTest { + + SchedulerServiceImpl scheduler; + + @BeforeClass + public void setUp() throws Exception { + System.setProperty(LensConfConstants.CONFIG_LOCATION, "target/test-classes/"); + LensServices.get().init(LensServerConf.getHiveConf()); + scheduler = LensServices.get().getService(SchedulerService.NAME); + setupQueryService(scheduler); + LensServices.get().start(); + } + + @AfterClass + public void tearDown() throws Exception { + } + + @Test + public void testRestart() throws Exception { + long start = new DateTime().getMillis() - 3600000 * 24; + // One day + long end = start + 2 * 3600000 * 24; + XJob testJob = getTestJob("0 0 0 * * ?", "test query", start, end); + long currentTime = System.currentTimeMillis(); + LensSessionHandle sessionHandle = scheduler.openSessionAsUser("admin"); + SchedulerJobHandle jobHandle = scheduler.submitAndScheduleJob(sessionHandle, testJob); + Thread.sleep(5000); + List<SchedulerJobInstanceInfo> instanceInfoList = scheduler.getJobInstances(jobHandle, 10L); + Assert.assertEquals(instanceInfoList.size(), 1); + + // Store new instance + SchedulerJobInstanceHandle instanceHandle = UtilityMethods.generateSchedulerJobInstanceHandle(); + SchedulerJobInstanceInfo instance = new SchedulerJobInstanceInfo(instanceHandle, jobHandle, currentTime, + new ArrayList<SchedulerJobInstanceRun>()); + SchedulerDAO store = scheduler.getSchedulerDAO(); + // Manually Store instance + store.storeJobInstance(instance); + SchedulerJobInstanceRun run = new SchedulerJobInstanceRun(instanceHandle, instance.getInstanceRunList().size() + 1, + null, currentTime, 0, "N/A", null, SchedulerJobInstanceState.WAITING); + instance.getInstanceRunList().add(run); + store.storeJobInstanceRun(run); + + // Restart Lens Services + LensServices.get().stop(); + LensMetricsUtil.clearRegistry(); + LogSegregationContext logSegregationContext = new MappedDiagnosticLogSegregationContext(); + LensServices.setInstance(new LensServices(LensServices.LENS_SERVICES_NAME, logSegregationContext)); + LensServices.get().init(LensServerConf.getHiveConf()); + scheduler = LensServices.get().getService(SchedulerService.NAME); + setupQueryService(scheduler); + LensQuery mockedQuery = PowerMockito.mock(LensQuery.class); + QueryStatus mockStatus = PowerMockito.mock(QueryStatus.class); + PowerMockito.when(mockStatus.getStatus()).thenReturn(QueryStatus.Status.SUCCESSFUL); + PowerMockito.when(mockedQuery.getStatus()).thenReturn(mockStatus); + PowerMockito.when(mockedQuery.getStatus().getStatus()).thenReturn(QueryStatus.Status.SUCCESSFUL); + PowerMockito.when(mockedQuery.getResultSetPath()).thenReturn("/tmp/path"); + PowerMockito.when(scheduler.getQueryService().getQuery(any(LensSessionHandle.class), any(QueryHandle.class))) + .thenReturn(mockedQuery); + LensServices.get().start(); + + // Sleep for some time to let the event get processed + Thread.sleep(5000); + // This should have 2 instance Run + SchedulerJobInstanceInfo storedInfo = scheduler.getInstanceDetails(instanceHandle); + Assert.assertEquals(storedInfo.getInstanceRunList().size(), 2); + // The first instance will be killed state after restart. + SchedulerJobInstanceInfo previousInstanceInfo = scheduler.getInstanceDetails(instanceInfoList.get(0).getId()); + //Because we mocked the query, It should not rerun. + Assert.assertEquals(previousInstanceInfo.getInstanceRunList().size(), 1); + Assert.assertEquals(previousInstanceInfo.getInstanceRunList().get(0).getResultPath(), "/tmp/path"); + Assert.assertEquals(previousInstanceInfo.getInstanceRunList().get(0).getInstanceState(), + SchedulerJobInstanceState.SUCCEEDED); + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java b/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java index 130df5f..aa5c897 100644 --- a/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java +++ b/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java @@ -18,31 +18,19 @@ */ package org.apache.lens.server.scheduler; -import static org.apache.lens.server.scheduler.util.SchedulerTestUtils.getTestJob; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; +import static org.apache.lens.server.scheduler.util.SchedulerTestUtils.*; import java.util.List; -import java.util.UUID; -import org.apache.lens.api.LensConf; import org.apache.lens.api.LensSessionHandle; -import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.query.QueryStatus; import org.apache.lens.api.scheduler.*; import org.apache.lens.server.EventServiceImpl; import org.apache.lens.server.LensServerConf; import org.apache.lens.server.LensServices; import org.apache.lens.server.api.LensConfConstants; -import org.apache.lens.server.api.query.QueryContext; -import org.apache.lens.server.api.query.QueryEnded; -import org.apache.lens.server.api.query.QueryExecutionService; import org.apache.lens.server.api.scheduler.SchedulerService; -import org.apache.hadoop.conf.Configuration; - -import org.powermock.api.mockito.PowerMockito; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -63,34 +51,7 @@ public class TestSchedulerServiceImpl { System.setProperty(LensConfConstants.CONFIG_LOCATION, "target/test-classes/"); } - private void setupQueryService() throws Exception { - QueryExecutionService queryExecutionService = PowerMockito.mock(QueryExecutionService.class); - scheduler.setQueryService(queryExecutionService); - PowerMockito.when( - scheduler.getQueryService().estimate(anyString(), any(LensSessionHandle.class), anyString(), any(LensConf.class))) - .thenReturn(null); - PowerMockito.when(scheduler.getQueryService() - .executeAsync(any(LensSessionHandle.class), anyString(), any(LensConf.class), anyString())) - .thenReturn(new QueryHandle(UUID.randomUUID())); - PowerMockito.when(scheduler.getQueryService().cancelQuery(any(LensSessionHandle.class), any(QueryHandle.class))) - .thenReturn(true); - scheduler.getSchedulerEventListener().setQueryService(queryExecutionService); - } - private QueryEnded mockQueryEnded(SchedulerJobInstanceHandle instanceHandle, QueryStatus.Status status) { - QueryContext mockContext = PowerMockito.mock(QueryContext.class); - PowerMockito.when(mockContext.getResultSetPath()).thenReturn("/tmp/query1/result"); - Configuration conf = new Configuration(); - // set the instance handle - conf.set("job_instance_key", instanceHandle.getHandleIdString()); - PowerMockito.when(mockContext.getConf()).thenReturn(conf); - // Get the queryHandle. - PowerMockito.when(mockContext.getQueryHandle()).thenReturn(new QueryHandle(UUID.randomUUID())); - QueryEnded queryEnded = PowerMockito.mock(QueryEnded.class); - PowerMockito.when(queryEnded.getQueryContext()).thenReturn(mockContext); - PowerMockito.when(queryEnded.getCurrentValue()).thenReturn(status); - return queryEnded; - } @Test(priority = 1) public void testScheduler() throws Exception { @@ -98,7 +59,7 @@ public class TestSchedulerServiceImpl { LensServices.get().start(); scheduler = LensServices.get().getService(SchedulerService.NAME); eventService = LensServices.get().getService(EventServiceImpl.NAME); - setupQueryService(); + setupQueryService(scheduler); LensSessionHandle sessionHandle = scheduler.openSessionAsUser(user); long currentTime = System.currentTimeMillis(); XJob job = getTestJob("0/5 * * * * ?", queryString, currentTime, currentTime + 15000); http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/test/java/org/apache/lens/server/scheduler/util/SchedulerTestUtils.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/scheduler/util/SchedulerTestUtils.java b/lens-server/src/test/java/org/apache/lens/server/scheduler/util/SchedulerTestUtils.java index a36b2aa..d50474a 100644 --- a/lens-server/src/test/java/org/apache/lens/server/scheduler/util/SchedulerTestUtils.java +++ b/lens-server/src/test/java/org/apache/lens/server/scheduler/util/SchedulerTestUtils.java @@ -18,18 +18,35 @@ */ package org.apache.lens.server.scheduler.util; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; + import java.util.GregorianCalendar; +import java.util.UUID; import javax.xml.datatype.DatatypeFactory; import javax.xml.datatype.XMLGregorianCalendar; +import org.apache.lens.api.LensConf; +import org.apache.lens.api.LensSessionHandle; +import org.apache.lens.api.query.QueryHandle; +import org.apache.lens.api.query.QueryStatus; import org.apache.lens.api.scheduler.*; +import org.apache.lens.server.api.query.QueryContext; +import org.apache.lens.server.api.query.QueryEnded; +import org.apache.lens.server.api.query.QueryExecutionService; +import org.apache.lens.server.scheduler.SchedulerServiceImpl; + +import org.apache.hadoop.conf.Configuration; + +import org.powermock.api.mockito.PowerMockito; public class SchedulerTestUtils { private SchedulerTestUtils() { } + private static XTrigger getTestTrigger(String cron) { XTrigger trigger = new XTrigger(); XFrequency frequency = new XFrequency(); @@ -67,4 +84,33 @@ public class SchedulerTestUtils { job.setExecution(getTestExecution(query)); return job; } + + public static void setupQueryService(SchedulerServiceImpl scheduler) throws Exception { + QueryExecutionService queryExecutionService = PowerMockito.mock(QueryExecutionService.class); + scheduler.setQueryService(queryExecutionService); + PowerMockito + .when(queryExecutionService.estimate(anyString(), any(LensSessionHandle.class), anyString(), any(LensConf.class))) + .thenReturn(null); + PowerMockito.when( + queryExecutionService.executeAsync(any(LensSessionHandle.class), anyString(), any(LensConf.class), anyString())) + .thenReturn(new QueryHandle(UUID.randomUUID())); + PowerMockito.when(queryExecutionService.cancelQuery(any(LensSessionHandle.class), any(QueryHandle.class))) + .thenReturn(true); + scheduler.getSchedulerEventListener().setQueryService(queryExecutionService); + } + + public static QueryEnded mockQueryEnded(SchedulerJobInstanceHandle instanceHandle, QueryStatus.Status status) { + QueryContext mockContext = PowerMockito.mock(QueryContext.class); + PowerMockito.when(mockContext.getResultSetPath()).thenReturn("/tmp/query1/result"); + Configuration conf = new Configuration(); + // set the instance handle + conf.set("job_instance_key", instanceHandle.getHandleIdString()); + PowerMockito.when(mockContext.getConf()).thenReturn(conf); + // Get the queryHandle. + PowerMockito.when(mockContext.getQueryHandle()).thenReturn(new QueryHandle(UUID.randomUUID())); + QueryEnded queryEnded = PowerMockito.mock(QueryEnded.class); + PowerMockito.when(queryEnded.getQueryContext()).thenReturn(mockContext); + PowerMockito.when(queryEnded.getCurrentValue()).thenReturn(status); + return queryEnded; + } }