Repository: lens Updated Branches: refs/heads/current-release-line 05533d1a0 -> d37e84994
LENS-1265: Scheduler Bug fixes (followup to LENS-128) Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/ebf018f9 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/ebf018f9 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/ebf018f9 Branch: refs/heads/current-release-line Commit: ebf018f97621eccd1ca423e1bbfdc036b6b46c2c Parents: 05533d1 Author: Lavkesh Lahngir <lavk...@linux.com> Authored: Wed Aug 10 12:26:29 2016 +0530 Committer: Archana H <archa...@apache.org> Committed: Wed Aug 17 13:40:43 2016 +0530 ---------------------------------------------------------------------- .../lens/api/scheduler/SchedulerJobInfo.java | 6 +- .../api/scheduler/SchedulerJobInstanceInfo.java | 5 + .../api/scheduler/SchedulerJobInstanceRun.java | 5 + .../scheduler/SchedulerJobInstanceState.java | 3 + .../lens/api/scheduler/SchedulerJobState.java | 3 + .../src/main/resources/scheduler-job-0.1.xsd | 14 +- .../server/api/scheduler/SchedulerService.java | 25 ++-- .../lens/server/scheduler/AlarmService.java | 25 ++-- .../lens/server/scheduler/ScheduleResource.java | 34 ++--- .../lens/server/scheduler/SchedulerDAO.java | 10 +- .../scheduler/SchedulerEventListener.java | 129 +++++++++++-------- .../scheduler/SchedulerQueryEventListener.java | 8 +- .../server/scheduler/SchedulerServiceImpl.java | 98 ++++++++++---- .../lens/server/scheduler/AlarmServiceTest.java | 5 +- .../scheduler/TestSchedulerServiceImpl.java | 37 +++--- 15 files changed, 256 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java index b19248f..50562f4 100644 --- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java @@ -18,14 +18,19 @@ */ package org.apache.lens.api.scheduler; +import javax.xml.bind.annotation.XmlRootElement; + import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; /** * POJO to represent the <code>job</code> table in the database. */ @Data @AllArgsConstructor +@NoArgsConstructor +@XmlRootElement public class SchedulerJobInfo { /** @@ -65,5 +70,4 @@ public class SchedulerJobInfo { * @return last modified time for this job */ private long modifiedOn; - } http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java index 52b56ca..9148af1 100644 --- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java @@ -20,14 +20,19 @@ package org.apache.lens.api.scheduler; import java.util.List; +import javax.xml.bind.annotation.XmlRootElement; + import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; /** * POJO for an instance of SchedulerJob. */ @Data @AllArgsConstructor +@NoArgsConstructor +@XmlRootElement public class SchedulerJobInstanceInfo { /** http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java index e6c1571..8532ed0 100644 --- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java @@ -18,16 +18,21 @@ */ package org.apache.lens.api.scheduler; +import javax.xml.bind.annotation.XmlRootElement; + import org.apache.lens.api.LensSessionHandle; import org.apache.lens.api.query.QueryHandle; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; @Data @AllArgsConstructor @EqualsAndHashCode +@NoArgsConstructor +@XmlRootElement public class SchedulerJobInstanceRun { /** http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java index 93d3d7e..3d2605e 100644 --- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java @@ -18,8 +18,11 @@ */ package org.apache.lens.api.scheduler; +import javax.xml.bind.annotation.*; + import org.apache.lens.api.error.InvalidStateTransitionException; +@XmlRootElement public enum SchedulerJobInstanceState implements StateTransitioner<SchedulerJobInstanceState, SchedulerJobInstanceEvent> { // repeating same operation will return the same state to ensure idempotent behavior. http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java index ffaae6c..f4fcce1 100644 --- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java @@ -18,8 +18,11 @@ */ package org.apache.lens.api.scheduler; +import javax.xml.bind.annotation.*; + import org.apache.lens.api.error.InvalidStateTransitionException; +@XmlRootElement public enum SchedulerJobState implements StateTransitioner<SchedulerJobState, SchedulerJobEvent> { // repeating same operation will return the same state to ensure idempotent behavior. NEW { http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-api/src/main/resources/scheduler-job-0.1.xsd ---------------------------------------------------------------------- diff --git a/lens-api/src/main/resources/scheduler-job-0.1.xsd b/lens-api/src/main/resources/scheduler-job-0.1.xsd index 4e6c68b..31f7d66 100644 --- a/lens-api/src/main/resources/scheduler-job-0.1.xsd +++ b/lens-api/src/main/resources/scheduler-job-0.1.xsd @@ -126,7 +126,7 @@ </xs:documentation> </xs:annotation> </xs:element> - <xs:element type="xs:string" name="resource_path" minOccurs="0" maxOccurs="unbounded"> + <xs:element type="resource_path" name="resource_path" minOccurs="0" maxOccurs="unbounded"> <xs:annotation> <xs:documentation> Path for resources like jars etc. e.g. /path/to/my/jar @@ -136,6 +136,18 @@ </xs:sequence> </xs:complexType> + <xs:complexType name="resource_path"> + <xs:annotation> + <xs:documentation> + A resource path with file name and type + </xs:documentation> + </xs:annotation> + <xs:sequence> + <xs:element name="path" type="xs:string"/> + <xs:element name="type" type="xs:string"/> + </xs:sequence> + </xs:complexType> + <xs:complexType name="x_job_query"> <xs:annotation> <xs:documentation> http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java b/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java index c7f73eb..8e1606e 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java @@ -70,22 +70,20 @@ public interface SchedulerService extends LensService, SessionValidator { /** * Returns the definition of a job. * - * @param sessionHandle handle for the session. * @param jobHandle handle for the job * @return job definition * @throws LensException the lens exception */ - XJob getJobDefinition(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; + XJob getJobDefinition(SchedulerJobHandle jobHandle) throws LensException; /** * Returns the details of a job. Details may contain extra system information like id for the job. * - * @param sessionHandle handle for the session. * @param jobHandle handle for the job * @return job details for the job * @throws LensException the lens exception */ - SchedulerJobInfo getJobDetails(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; + SchedulerJobInfo getJobDetails(SchedulerJobHandle jobHandle) throws LensException; /** * Update a job with new definition. @@ -146,7 +144,6 @@ public interface SchedulerService extends LensService, SessionValidator { boolean deleteJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; /** - * @param sessionHandle handle for the current session. * @param state filter for status, if specified only jobs in that state will be returned, * if null no entries will be removed from result * @param user filter for user who submitted the job, if specified only jobs submitted by the given user @@ -158,13 +155,12 @@ public interface SchedulerService extends LensService, SessionValidator { * @return A collection of stats per job * @throws LensException */ - Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle sessionHandle, String state, String user, + Collection<SchedulerJobStats> getAllJobStats(String state, String user, String jobName, long startTime, long endTime) throws LensException; /** * Returns stats for a job. * - * @param sessionHandle handle for session. * @param handle handle for the job * @param state filter for status, if specified only jobs in that state will be returned, * if null no entries will be removed from result @@ -172,19 +168,18 @@ public interface SchedulerService extends LensService, SessionValidator { * @param endTime if specified only instances with scheduleTime before this time will be considered. * @throws LensException the lens exception */ - SchedulerJobStats getJobStats(LensSessionHandle sessionHandle, SchedulerJobHandle handle, String state, + SchedulerJobStats getJobStats(SchedulerJobHandle handle, String state, long startTime, long endTime) throws LensException; /** * Returns handles for last <code>numResults</code> instances for the job. * - * @param sessionHandle handle for the session. * @param jobHandle handle for the job * @param numResults - number of results to be returned, default 100. * @return list of instance ids for the job * @throws LensException the lens exception */ - List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, + List<SchedulerJobInstanceInfo> getJobInstances(SchedulerJobHandle jobHandle, Long numResults) throws LensException; /** @@ -215,12 +210,16 @@ public interface SchedulerService extends LensService, SessionValidator { /** * Instance details for an instance. * - * @param sessionHandle handle for the session. * @param instanceHandle handle for the instance. * @return details for the instance. * @throws LensException the lens exception */ - SchedulerJobInstanceInfo getInstanceDetails(LensSessionHandle sessionHandle, - SchedulerJobInstanceHandle instanceHandle) throws LensException; + SchedulerJobInstanceInfo getInstanceDetails(SchedulerJobInstanceHandle instanceHandle) throws LensException; + /** + * Create session as user for scheduling the job with no auth. + * @param user + * @return LensSessionHandle + */ + LensSessionHandle openSessionAsUser(String user) throws LensException; } http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java index 4491261..2009a20 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java @@ -138,13 +138,14 @@ public class AlarmService extends AbstractService implements LensService { Trigger trigger; if (frequency.getEnum() != null) { //for enum expression: create a trigger using calendar interval CalendarIntervalScheduleBuilder scheduleBuilder = CalendarIntervalScheduleBuilder.calendarIntervalSchedule() - .withInterval(getTimeInterval(frequency.getEnum()), getTimeUnit(frequency.getEnum())) - .withMisfireHandlingInstructionIgnoreMisfires(); + .withInterval(getTimeInterval(frequency.getEnum()), getTimeUnit(frequency.getEnum())) + .withMisfireHandlingInstructionIgnoreMisfires(); trigger = TriggerBuilder.newTrigger().withIdentity(jobHandle, ALARM_SERVICE).startAt(start.toDate()) - .endAt(end.toDate()).withSchedule(scheduleBuilder).build(); + .endAt(end.toDate()).withSchedule(scheduleBuilder).build(); } else { // for cron expression create a cron trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobHandle, ALARM_SERVICE).startAt(start.toDate()) - .endAt(end.toDate()).withSchedule(CronScheduleBuilder.cronSchedule(frequency.getCronExpression())).build(); + .endAt(end.toDate()).withSchedule(CronScheduleBuilder.cronSchedule(frequency.getCronExpression()) + .withMisfireHandlingInstructionIgnoreMisfires()).build(); } // Tell quartz to run the job using our trigger @@ -190,7 +191,7 @@ public class AlarmService extends AbstractService implements LensService { try { return scheduler.deleteJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS)); } catch (SchedulerException e) { - log.error("Failed to remove alarm triggers for job with jobHandle: " + jobHandle, e); + log.error("Failed to remove alarm triggers for job with jobHandle: {}", jobHandle); throw new LensException("Failed to remove alarm triggers for job with jobHandle: " + jobHandle, e); } } @@ -199,8 +200,8 @@ public class AlarmService extends AbstractService implements LensService { try { return scheduler.checkExists(JobKey.jobKey(handle.getHandleIdString(), LENS_JOBS)); } catch (SchedulerException e) { - log.error("Failed to check the job with jobHandle: " + handle, e); - throw new LensException("Failed to check the job with jobHandle: " + handle, e); + log.error("Failed to check the job with jobHandle: {}", handle); + return false; } } @@ -208,7 +209,7 @@ public class AlarmService extends AbstractService implements LensService { try { scheduler.pauseJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS)); } catch (SchedulerException e) { - log.error("Failed to pause alarm triggers for job with jobHandle: " + jobHandle, e); + log.error("Failed to pause alarm triggers for job with jobHandle: {}", jobHandle); throw new LensException("Failed to pause alarm triggers for job with jobHandle: " + jobHandle, e); } } @@ -217,7 +218,7 @@ public class AlarmService extends AbstractService implements LensService { try { scheduler.resumeJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS)); } catch (SchedulerException e) { - log.error("Failed to resume alarm triggers for job with jobHandle: " + jobHandle, e); + log.error("Failed to resume alarm triggers for job with jobHandle: {}", jobHandle); throw new LensException("Failed to resume alarm triggers for job with jobHandle: " + jobHandle, e); } } @@ -230,17 +231,17 @@ public class AlarmService extends AbstractService implements LensService { DateTime nominalTime = new DateTime(jobExecutionContext.getScheduledFireTime()); SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString("jobHandle")); SchedulerAlarmEvent alarmEvent = new SchedulerAlarmEvent(jobHandle, nominalTime, - SchedulerAlarmEvent.EventType.SCHEDULE, null); + SchedulerAlarmEvent.EventType.SCHEDULE, null); try { LensEventService eventService = LensServices.get().getService(LensEventService.NAME); eventService.notifyEvent(alarmEvent); if (jobExecutionContext.getNextFireTime() == null) { eventService - .notifyEvent(new SchedulerAlarmEvent(jobHandle, nominalTime, SchedulerAlarmEvent.EventType.EXPIRE, null)); + .notifyEvent(new SchedulerAlarmEvent(jobHandle, nominalTime, SchedulerAlarmEvent.EventType.EXPIRE, null)); } } catch (LensException e) { log.error("Failed to notify SchedulerAlarmEvent for jobHandle: {} and scheduleTime: {}", - jobHandle.getHandleIdString(), nominalTime.toString(), e); + jobHandle.getHandleIdString(), nominalTime.toString()); throw new JobExecutionException("Failed to notify alarmEvent", e); } } http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java index 8603edf..7a0b485 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java @@ -23,6 +23,7 @@ import java.util.List; import javax.ws.rs.*; import javax.ws.rs.core.MediaType; +import javax.xml.bind.JAXBElement; import org.apache.lens.api.APIResult; import org.apache.lens.api.LensSessionHandle; @@ -39,6 +40,7 @@ import org.apache.commons.lang3.StringUtils; @Path("scheduler") @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) public class ScheduleResource { + private static final ObjectFactory OBJECT_FACTORY = new ObjectFactory(); public enum INSTANCE_ACTIONS { KILL, RERUN; @@ -94,21 +96,22 @@ public class ScheduleResource { @DefaultValue("user") @QueryParam("user") String user, @DefaultValue("-1") @QueryParam("start") long start, @DefaultValue("-1") @QueryParam("end") long end) throws LensException { - return getSchedulerService().getAllJobStats(sessionId, status, user, jobName, start, end); + validateSession(sessionId); + return getSchedulerService().getAllJobStats(status, user, jobName, start, end); } @GET @Path("jobs/{jobHandle}") - public XJob getJobDefinition(@QueryParam("sessionid") LensSessionHandle sessionId, + public JAXBElement<XJob> getJobDefinition(@QueryParam("sessionid") LensSessionHandle sessionId, @PathParam("jobHandle") SchedulerJobHandle jobHandle) throws LensException { - return getSchedulerService().getJobDefinition(sessionId, jobHandle); + return OBJECT_FACTORY.createJob(getSchedulerService().getJobDefinition(jobHandle)); } @DELETE @Path("jobs/{jobHandle}") public APIResult deleteJob(@QueryParam("sessionid") LensSessionHandle sessionId, - @QueryParam("jobHandle") SchedulerJobHandle jobHandle) throws LensException { + @PathParam("jobHandle") SchedulerJobHandle jobHandle) throws LensException { validateSession(sessionId); getSchedulerService().deleteJob(sessionId, jobHandle); return APIResult.success(); @@ -155,11 +158,11 @@ public class ScheduleResource { } @GET - @Path("jobs/{jobHandle}/stats") + @Path("jobs/{jobHandle}/info") public SchedulerJobInfo getJobDetails(@QueryParam("sessionid") LensSessionHandle sessionId, @PathParam("jobHandle") SchedulerJobHandle jobHandle) throws LensException { validateSession(sessionId); - return getSchedulerService().getJobDetails(sessionId, jobHandle); + return getSchedulerService().getJobDetails(jobHandle); } @GET @@ -168,7 +171,7 @@ public class ScheduleResource { @PathParam("jobHandle") SchedulerJobHandle jobHandle, @QueryParam("numResults") Long numResults) throws LensException { validateSession(sessionId); - return getSchedulerService().getJobInstances(sessionId, jobHandle, numResults); + return getSchedulerService().getJobInstances(jobHandle, numResults); } @GET @@ -177,34 +180,33 @@ public class ScheduleResource { @PathParam("instanceHandle") SchedulerJobInstanceHandle instanceHandle) throws LensException { validateSession(sessionId); - return getSchedulerService().getInstanceDetails(sessionId, instanceHandle); + return getSchedulerService().getInstanceDetails(instanceHandle); } @POST @Path("instances/{instanceHandle}") public APIResult updateInstance(@QueryParam("sessionid") LensSessionHandle sessionId, - @PathParam("instanceHandle") SchedulerJobInstanceHandle instanceHandle, - @QueryParam("action") INSTANCE_ACTIONS action) throws LensException { + @PathParam("instanceHandle") SchedulerJobInstanceHandle instanceHandle, + @QueryParam("action") INSTANCE_ACTIONS action) throws LensException { + APIResult res = null; validateSession(sessionId); - - APIResult res; switch (action) { case KILL: if (getSchedulerService().killInstance(sessionId, instanceHandle)) { res = new APIResult(APIResult.Status.SUCCEEDED, - "Killing the instance with id " + instanceHandle + " was successful"); + "Killing the instance with id " + instanceHandle + " was successful"); } else { res = new APIResult(APIResult.Status.FAILED, - "Killing the instance with id " + instanceHandle + " was not successful"); + "Killing the instance with id " + instanceHandle + " was not successful"); } break; case RERUN: if (getSchedulerService().rerunInstance(sessionId, instanceHandle)) { res = new APIResult(APIResult.Status.SUCCEEDED, - "Rerunning the instance with id " + instanceHandle + " was successful"); + "Rerunning the instance with id " + instanceHandle + " was successful"); } else { res = new APIResult(APIResult.Status.FAILED, - "Rerunning the instance with id " + instanceHandle + " was not successful"); + "Rerunning the instance with id " + instanceHandle + " was not successful"); } break; default: http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java index 7a2b06a..6a4c77b 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java @@ -286,7 +286,7 @@ public class SchedulerDAO { protected static final String COLUMN_USER = "username"; protected static final String COLUMN_STATUS = "status"; protected static final String COLUMN_CREATED_ON = "createdon"; - protected static final String COLUMN_SCHEDULE_TIME = "schedultime"; + protected static final String COLUMN_SCHEDULE_TIME = "scheduledtime"; protected static final String COLUMN_MODIFIED_ON = "modifiedon"; protected static final String COLUMN_JOB_ID = "jobid"; protected static final String COLUMN_SESSION_HANDLE = "sessionhandle"; @@ -376,8 +376,8 @@ public class SchedulerDAO { public int insertIntoJobInstanceRunTable(SchedulerJobInstanceRun instanceRun) throws SQLException { String insetSQL = "INSERT INTO " + JOB_INSTANCE_RUN_TABLE + " VALUES(?,?,?,?,?,?,?,?)"; return runner.update(insetSQL, instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId(), - instanceRun.getSessionHandle().toString(), instanceRun.getStartTime(), instanceRun.getEndTime(), - instanceRun.getResultPath(), + instanceRun.getSessionHandle() == null ? "" : instanceRun.getSessionHandle().toString(), + instanceRun.getStartTime(), instanceRun.getEndTime(), instanceRun.getResultPath(), instanceRun.getQueryHandle() == null ? "" : instanceRun.getQueryHandle().getHandleIdString(), instanceRun.getInstanceState().name()); } @@ -665,7 +665,7 @@ public class SchedulerDAO { "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " TEXT, " + COLUMN_QUERY_HANDLE - + " TEXT, " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + COLUMN_ID + ", " + COLUMN_RUN_ID + + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + COLUMN_ID + ", " + COLUMN_RUN_ID + ")" + ")"; runner.update(createSQL); } @@ -709,7 +709,7 @@ public class SchedulerDAO { "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " VARCHAR(1024)," - + COLUMN_QUERY_HANDLE + " VARCHAR(1024), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + + COLUMN_QUERY_HANDLE + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + COLUMN_ID + ", " + COLUMN_RUN_ID + " )" + ")"; runner.update(createSQL); } http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java index 7323add..d1d4a68 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java @@ -19,9 +19,7 @@ package org.apache.lens.server.scheduler; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.lens.api.LensConf; import org.apache.lens.api.LensSessionHandle; @@ -33,9 +31,10 @@ import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.events.AsyncEventListener; import org.apache.lens.server.api.events.SchedulerAlarmEvent; +import org.apache.lens.server.api.metastore.CubeMetastoreService; import org.apache.lens.server.api.query.QueryExecutionService; import org.apache.lens.server.api.scheduler.SchedulerService; -import org.apache.lens.server.query.QueryExecutionServiceImpl; +import org.apache.lens.server.api.session.SessionService; import org.apache.lens.server.util.UtilityMethods; import org.joda.time.DateTime; @@ -55,14 +54,48 @@ public class SchedulerEventListener extends AsyncEventListener<SchedulerAlarmEve protected QueryExecutionService queryService; private SchedulerDAO schedulerDAO; private SchedulerService schedulerService; + private SessionService sessionService; + private CubeMetastoreService cubeMetastoreService; public SchedulerEventListener(SchedulerDAO schedulerDAO) { super(CORE_POOL_SIZE); this.queryService = LensServices.get().getService(QueryExecutionService.NAME); this.schedulerService = LensServices.get().getService(SchedulerService.NAME); + this.sessionService = LensServices.get().getService(SessionService.NAME); + this.cubeMetastoreService = LensServices.get().getService(CubeMetastoreService.NAME); this.schedulerDAO = schedulerDAO; } + private LensSessionHandle getSessionHandle(String user) throws LensException { + return schedulerService.openSessionAsUser(user); + } + + private void setSessionConf(LensSessionHandle sessionHandle, XJob job) throws LensException { + XExecution execution = job.getExecution(); + XSessionType executionSession = execution.getSession(); + cubeMetastoreService.setCurrentDatabase(sessionHandle, executionSession.getDb()); + List<MapType> sessionConfList = executionSession.getConf(); + for (MapType element : sessionConfList) { + sessionService.setSessionParameter(sessionHandle, element.getKey(), element.getValue()); + } + List<ResourcePath> resourceList = executionSession.getResourcePath(); + for (ResourcePath path : resourceList) { + sessionService.addResource(sessionHandle, path.getType(), path.getPath()); + } + } + + private LensConf getLensConf(XJob job, SchedulerJobInstanceHandle instanceHandle, DateTime scheduledTime) { + List<MapType> jobConf = job.getExecution().getQuery().getConf(); + LensConf queryConf = new LensConf(); + for (MapType element : jobConf) { + queryConf.addProperty(element.getKey(), element.getValue()); + } + queryConf.addProperty(JOB_INSTANCE_ID_KEY, instanceHandle.getHandleId()); + // Current time is used for resolving date. + queryConf.addProperty(LensConfConstants.QUERY_CURRENT_TIME_IN_MILLIS, scheduledTime.getMillis()); + return queryConf; + } + /** * @param event the event */ @@ -70,14 +103,6 @@ public class SchedulerEventListener extends AsyncEventListener<SchedulerAlarmEve public void process(SchedulerAlarmEvent event) { DateTime scheduledTime = event.getNominalTime(); SchedulerJobHandle jobHandle = event.getJobHandle(); - if (event.getType() == SchedulerAlarmEvent.EventType.EXPIRE) { - try { - schedulerService.expireJob(null, jobHandle); - } catch (LensException e) { - log.error("Error while expiring the job", e); - } - return; - } /* * Get the job from the store. * Create an instance. @@ -86,75 +111,69 @@ public class SchedulerEventListener extends AsyncEventListener<SchedulerAlarmEve * If successfully submitted change the status to running. * Otherwise update the status to killed. */ - //TODO: Get the job status and if it is not Scheduled, don't do anything. XJob job = schedulerDAO.getJob(jobHandle); String user = schedulerDAO.getUser(jobHandle); SchedulerJobInstanceHandle instanceHandle = event.getPreviousInstance() == null ? UtilityMethods.generateSchedulerJobInstanceHandle() : event.getPreviousInstance(); - Map<String, String> conf = new HashMap<>(); - LensSessionHandle sessionHandle = null; - try { - // Open the session with the user who submitted the job. - sessionHandle = ((QueryExecutionServiceImpl) LensServices.get().getService(QueryExecutionServiceImpl.NAME)) - .openSession(user, "dummy", conf, false); - } catch (LensException e) { - log.error("Error occurred while opening a session ", e); - return; - } SchedulerJobInstanceInfo instance = null; SchedulerJobInstanceRun run = null; - // Session needs to be closed after the launch. + LensSessionHandle sessionHandle = null; + try { - long scheduledTimeMillis = scheduledTime.getMillis(); - String query = job.getExecution().getQuery().getQuery(); - List<MapType> jobConf = job.getExecution().getQuery().getConf(); - LensConf queryConf = new LensConf(); - for (MapType element : jobConf) { - queryConf.addProperty(element.getKey(), element.getValue()); + sessionHandle = getSessionHandle(user); + setSessionConf(sessionHandle, job); + if (event.getType() == SchedulerAlarmEvent.EventType.EXPIRE) { + try { + log.info("Expiring job with handle {}", jobHandle); + schedulerService.expireJob(sessionHandle, jobHandle); + } catch (LensException e) { + log.error("Error while expiring the job", e); + } + return; } - queryConf.addProperty(JOB_INSTANCE_ID_KEY, instanceHandle.getHandleId()); - // Current time is used for resolving date. - queryConf.addProperty(LensConfConstants.QUERY_CURRENT_TIME_IN_MILLIS, scheduledTime.getMillis()); - String queryName = job.getName(); - queryName += "-" + scheduledTime.getMillis(); + long scheduledTimeMillis = scheduledTime.getMillis(); // If the instance is new then create otherwise get from the store if (event.getPreviousInstance() == null) { instance = new SchedulerJobInstanceInfo(instanceHandle, jobHandle, scheduledTimeMillis, - new ArrayList<SchedulerJobInstanceRun>()); + new ArrayList<SchedulerJobInstanceRun>()); + // Store the instance + if (schedulerDAO.storeJobInstance(instance) != 1) { + log.error("Store was unsuccessful for instance {} of job {} ", instanceHandle, jobHandle); + return; + } } else { instance = schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle); } // Next run of the instance long currentTime = System.currentTimeMillis(); - run = new SchedulerJobInstanceRun(instanceHandle, instance.getInstanceRunList().size() + 1, sessionHandle, - currentTime, 0, "N/A", null, SchedulerJobInstanceState.WAITING); + run = new SchedulerJobInstanceRun(instanceHandle, instance.getInstanceRunList().size() + 1, null, currentTime, 0, + "N/A", null, SchedulerJobInstanceState.WAITING); instance.getInstanceRunList().add(run); - boolean success; - if (event.getPreviousInstance() == null) { - success = schedulerDAO.storeJobInstance(instance) == 1; - if (!success) { - log.error( - "Exception occurred while storing the instance for instance handle " + instance + " of job " + jobHandle); - return; - } - } - success = schedulerDAO.storeJobInstanceRun(run) == 1; - if (!success) { - log.error( - "Exception occurred while storing the instance for instance handle " + instance + " of job " + jobHandle); + if (schedulerDAO.storeJobInstanceRun(run) != 1) { + log.error("Exception occurred while storing the instance run for instance handle {} of job {}", instance, + jobHandle); return; } - + run.setSessionHandle(sessionHandle); + LensConf queryConf = getLensConf(job, instanceHandle, scheduledTime); + // Query Launch + String query = job.getExecution().getQuery().getQuery(); + String queryName = job.getName(); + queryName += "-" + scheduledTimeMillis; QueryHandle handle = queryService.executeAsync(sessionHandle, query, queryConf, queryName); + log.info("Running instance {} of job {} with run {} with query handle {}", instanceHandle, jobHandle, + run.getRunId(), handle); run.setQueryHandle(handle); run.setInstanceState(run.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_RUN)); run.setEndTime(System.currentTimeMillis()); + // Update run schedulerDAO.updateJobInstanceRun(run); + log.info("Successfully updated instance run with instance {} of job {}", instanceHandle, jobHandle); } catch (LensException | InvalidStateTransitionException e) { log.error( - "Exception occurred while launching the job instance for " + jobHandle + " for nominal time " + scheduledTime - .getMillis(), e); + "Exception occurred while launching the job instance for " + jobHandle + " for nominal time " + scheduledTime + .getMillis(), e); try { run.setInstanceState(run.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_FAILURE)); run.setEndTime(System.currentTimeMillis()); @@ -163,9 +182,9 @@ public class SchedulerEventListener extends AsyncEventListener<SchedulerAlarmEve log.error("Can't make transition for instance " + instance.getId() + " of job " + instance.getJobId(), e); } } finally { + // Session needs to be closed after the launch. try { - ((QueryExecutionServiceImpl) LensServices.get().getService(QueryExecutionServiceImpl.NAME)) - .closeSession(sessionHandle); + sessionService.closeSession(sessionHandle); } catch (LensException e) { log.error("Error closing session ", e); } http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java index 5b12720..077d531 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java @@ -56,10 +56,10 @@ public class SchedulerQueryEventListener extends AsyncEventListener<QueryEnded> return; } SchedulerJobInstanceInfo info = schedulerDAO - .getSchedulerJobInstanceInfo(SchedulerJobInstanceHandle.fromString(instanceHandle)); + .getSchedulerJobInstanceInfo(SchedulerJobInstanceHandle.fromString(instanceHandle)); List<SchedulerJobInstanceRun> runList = info.getInstanceRunList(); if (runList.size() == 0) { - log.error("No instance run for " + instanceHandle + " with query " + queryContext.getQueryHandle()); + log.error("No instance run for {} with query {}", instanceHandle, queryContext.getQueryHandle()); return; } SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1); @@ -78,8 +78,10 @@ public class SchedulerQueryEventListener extends AsyncEventListener<QueryEnded> } latestRun.setEndTime(System.currentTimeMillis()); latestRun.setInstanceState(state); - latestRun.setResultPath(queryContext.getDriverResultPath()); + latestRun.setResultPath(queryContext.getResultSetPath()); schedulerDAO.updateJobInstanceRun(latestRun); + log.info("Updated instance run {} for instance {} for job {} to {}", latestRun.getRunId(), info.getId(), + info.getJobId(), state); } catch (InvalidStateTransitionException e) { log.error("Instance Transition Failed ", e); } http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java index 14ca32d..28d7e27 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.lens.server.scheduler; import java.util.Collection; +import java.util.HashMap; import java.util.List; import org.apache.lens.api.LensConf; @@ -94,6 +95,12 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe } } + private void doesSessionBelongToUser(LensSessionHandle sessionHandle, String user) throws LensException { + LensSessionImpl session = getSession(sessionHandle); + if (!session.getLoggedInUser().equals(user)) + throw new LensException("Logged in user " + session.getLoggedInUser() + " is not same as " + user); + } + @Override public synchronized void start() { super.start(); @@ -113,6 +120,15 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * {@inheritDoc} */ @Override + public LensSessionHandle openSessionAsUser(String user) throws LensException { + // Open session with no auth + return openSession(user, "Mimbulus Mimbletonia", new HashMap<String, String>(), false); + } + + /** + * {@inheritDoc} + */ + @Override public SchedulerJobHandle submitJob(LensSessionHandle sessionHandle, XJob job) throws LensException { LensSessionImpl session = getSession(sessionHandle); // Validate XJob @@ -121,8 +137,9 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe long createdOn = System.currentTimeMillis(); long modifiedOn = createdOn; SchedulerJobInfo info = new SchedulerJobInfo(handle, job, session.getLoggedInUser(), SchedulerJobState.NEW, - createdOn, modifiedOn); + createdOn, modifiedOn); if (schedulerDAO.storeJob(info) == 1) { + log.info("Successfully submitted job with handle {}", handle); return handle; } else { throw new LensException("Could not Submit the job"); @@ -138,6 +155,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe @Override public boolean scheduleJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { SchedulerJobInfo jobInfo = schedulerDAO.getSchedulerJobInfo(jobHandle); + doesSessionBelongToUser(sessionHandle, jobInfo.getUserName()); XJob job = jobInfo.getJob(); DateTime start = new DateTime(job.getStartTime().toGregorianCalendar().getTime()); DateTime end = new DateTime(job.getEndTime().toGregorianCalendar().getTime()); @@ -145,7 +163,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe // check query checkQuery(sessionHandle, job); alarmService.schedule(start, end, frequency, jobHandle.getHandleIdString()); - return setStateOfJob(jobHandle, SchedulerJobEvent.ON_SCHEDULE) == 1; + log.info("Successfully scheduled job with handle {} in AlarmService", jobHandle); + return setStateOfJob(jobInfo, SchedulerJobEvent.ON_SCHEDULE) == 1; } private void checkQuery(LensSessionHandle sessionHandle, XJob job) throws LensException { @@ -156,7 +175,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe } queryConf.addProperty(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, false); queryService.estimate(LensServices.get().getLogSegregationContext().getLogSegragationId(), sessionHandle, - job.getExecution().getQuery().getQuery(), queryConf); + job.getExecution().getQuery().getQuery(), queryConf); return; } @@ -171,7 +190,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * {@inheritDoc} */ @Override - public XJob getJobDefinition(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + public XJob getJobDefinition(SchedulerJobHandle jobHandle) throws LensException { return schedulerDAO.getJob(jobHandle); } @@ -179,8 +198,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * {@inheritDoc} */ @Override - public SchedulerJobInfo getJobDetails(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) - throws LensException { + public SchedulerJobInfo getJobDetails(SchedulerJobHandle jobHandle) throws LensException { return schedulerDAO.getSchedulerJobInfo(jobHandle); } @@ -191,8 +209,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe public boolean updateJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, XJob newJobDefinition) throws LensException { SchedulerJobInfo jobInfo = schedulerDAO.getSchedulerJobInfo(jobHandle); + doesSessionBelongToUser(sessionHandle, jobInfo.getUserName()); // This will allow only the job definition and configuration change. - // TODO: fix start and end time changes jobInfo.setJob(newJobDefinition); jobInfo.setModifiedOn(System.currentTimeMillis()); int updated = schedulerDAO.updateJob(jobInfo); @@ -204,10 +222,13 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public boolean expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(jobHandle); + doesSessionBelongToUser(sessionHandle, info.getUserName()); if (alarmService.checkExists(jobHandle)) { alarmService.unSchedule(jobHandle); + log.info("Successfully unscheduled the job with handle {} in AlarmService ", jobHandle); } - return setStateOfJob(jobHandle, SchedulerJobEvent.ON_EXPIRE) == 1; + return setStateOfJob(info, SchedulerJobEvent.ON_EXPIRE) == 1; } /** @@ -215,8 +236,10 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public boolean suspendJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(jobHandle); + doesSessionBelongToUser(sessionHandle, info.getUserName()); alarmService.pauseJob(jobHandle); - return setStateOfJob(jobHandle, SchedulerJobEvent.ON_SUSPEND) == 1; + return setStateOfJob(info, SchedulerJobEvent.ON_SUSPEND) == 1; } /** @@ -224,8 +247,10 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public boolean resumeJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(jobHandle); + doesSessionBelongToUser(sessionHandle, info.getUserName()); alarmService.resumeJob(jobHandle); - return setStateOfJob(jobHandle, SchedulerJobEvent.ON_RESUME) == 1; + return setStateOfJob(info, SchedulerJobEvent.ON_RESUME) == 1; } /** @@ -233,18 +258,21 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public boolean deleteJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(jobHandle); + doesSessionBelongToUser(sessionHandle, info.getUserName()); if (alarmService.checkExists(jobHandle)) { alarmService.unSchedule(jobHandle); + log.info("Successfully unscheduled the job with handle {} ", jobHandle); } - return setStateOfJob(jobHandle, SchedulerJobEvent.ON_DELETE) == 1; + return setStateOfJob(info, SchedulerJobEvent.ON_DELETE) == 1; } /** * {@inheritDoc} */ @Override - public Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle sessionHandle, String state, String user, - String jobName, long startTime, long endTime) throws LensException { + public Collection<SchedulerJobStats> getAllJobStats(String state, String user, String jobName, long startTime, + long endTime) throws LensException { return null; } @@ -252,8 +280,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * {@inheritDoc} */ @Override - public SchedulerJobStats getJobStats(LensSessionHandle sessionHandle, SchedulerJobHandle handle, String state, - long startTime, long endTime) throws LensException { + public SchedulerJobStats getJobStats(SchedulerJobHandle handle, String state, long startTime, long endTime) + throws LensException { return null; } @@ -264,6 +292,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe public boolean rerunInstance(LensSessionHandle sessionHandle, SchedulerJobInstanceHandle instanceHandle) throws LensException { SchedulerJobInstanceInfo instanceInfo = schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle); + doesSessionBelongToUser(sessionHandle, schedulerDAO.getUser(instanceInfo.getJobId())); if (schedulerDAO.getJobState(instanceInfo.getJobId()) != SchedulerJobState.SCHEDULED) { throw new LensException("Job with handle " + instanceInfo.getJobId() + " is not scheduled"); } @@ -277,8 +306,9 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe try { latestRun.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_RERUN); getEventService().notifyEvent( - new SchedulerAlarmEvent(instanceInfo.getJobId(), new DateTime(instanceInfo.getScheduleTime()), - SchedulerAlarmEvent.EventType.SCHEDULE, instanceHandle)); + new SchedulerAlarmEvent(instanceInfo.getJobId(), new DateTime(instanceInfo.getScheduleTime()), + SchedulerAlarmEvent.EventType.SCHEDULE, instanceHandle)); + log.info("Rerunning the instance with {} for job {} ", instanceHandle, instanceInfo.getJobId()); } catch (InvalidStateTransitionException e) { throw new LensException("Invalid State Transition ", e); } @@ -289,8 +319,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * {@inheritDoc} */ @Override - public List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, - Long numResults) throws LensException { + public List<SchedulerJobInstanceInfo> getJobInstances(SchedulerJobHandle jobHandle, Long numResults) + throws LensException { return schedulerDAO.getJobInstances(jobHandle); } @@ -301,15 +331,28 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * Get the query handle from the latest run. */ SchedulerJobInstanceInfo instanceInfo = schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle); + doesSessionBelongToUser(sessionHandle, schedulerDAO.getUser(instanceInfo.getJobId())); List<SchedulerJobInstanceRun> runList = instanceInfo.getInstanceRunList(); if (runList.size() == 0) { throw new LensException("Job instance " + instanceHandle + " is not yet run"); } SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1); QueryHandle handle = latestRun.getQueryHandle(); - if (handle.getHandleIdString().isEmpty()) { - return false; + if (handle == null || handle.getHandleIdString().isEmpty()) { + SchedulerJobInstanceState state = latestRun.getInstanceState(); + try { + state = state.nextTransition(SchedulerJobInstanceEvent.ON_KILL); + } catch (InvalidStateTransitionException e) { + throw new LensException("Invalid Transition of state ", e); + } + latestRun.setEndTime(System.currentTimeMillis()); + latestRun.setInstanceState(state); + schedulerDAO.updateJobInstanceRun(latestRun); + log.info("Killing instance with {} for job {} ", instanceHandle, instanceInfo.getJobId()); + return true; } + log.info("Killing instance with {} for job {} with query handle {} ", instanceHandle, instanceInfo.getJobId(), + handle); // This will cause the QueryEnd event which will set the status of the instance to KILLED. return queryService.cancelQuery(sessionHandle, handle); } @@ -318,19 +361,22 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * {@inheritDoc} */ @Override - public SchedulerJobInstanceInfo getInstanceDetails(LensSessionHandle sessionHandle, - SchedulerJobInstanceHandle instanceHandle) throws LensException { + public SchedulerJobInstanceInfo getInstanceDetails(SchedulerJobInstanceHandle instanceHandle) throws LensException { return schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle); } - private int setStateOfJob(SchedulerJobHandle handle, SchedulerJobEvent event) throws LensException { + private int setStateOfJob(SchedulerJobInfo info, SchedulerJobEvent event) throws LensException { try { - SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(handle); SchedulerJobState currentState = info.getJobState(); SchedulerJobState nextState = currentState.nextTransition(event); info.setJobState(nextState); info.setModifiedOn(System.currentTimeMillis()); - return schedulerDAO.updateJobStatus(info); + int ret = schedulerDAO.updateJobStatus(info); + if (ret == 1) { + log.info("Successfully changed the status of job with handle {} from {} to {}", info.getId(), currentState, + nextState); + } + return ret; } catch (InvalidStateTransitionException e) { throw new LensException("Invalid state ", e); } http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-server/src/test/java/org/apache/lens/server/scheduler/AlarmServiceTest.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/scheduler/AlarmServiceTest.java b/lens-server/src/test/java/org/apache/lens/server/scheduler/AlarmServiceTest.java index 7b610de..02f371c 100644 --- a/lens-server/src/test/java/org/apache/lens/server/scheduler/AlarmServiceTest.java +++ b/lens-server/src/test/java/org/apache/lens/server/scheduler/AlarmServiceTest.java @@ -159,9 +159,10 @@ public class AlarmServiceTest { SchedulerJobHandle jobHandle = new SchedulerJobHandle(UUID.randomUUID()); System.out.println("jobHandle = " + jobHandle); XFrequency frequency = new XFrequency(); - frequency.setCronExpression("0/1 * * * * ?"); + frequency.setCronExpression("0 0 12 * * ?"); alarmService.schedule(start, end, frequency, jobHandle.toString()); - Thread.sleep(2000); + Thread.sleep(1000); + alarmService.unSchedule(jobHandle); // Assert that the events are fired and at per second interval. assertTrue(events.size() > 1); } http://git-wip-us.apache.org/repos/asf/lens/blob/ebf018f9/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java b/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java index ce744af..e182c72 100644 --- a/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java +++ b/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java @@ -22,7 +22,6 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import java.util.GregorianCalendar; -import java.util.HashMap; import java.util.List; import java.util.UUID; @@ -42,7 +41,6 @@ import org.apache.lens.server.api.query.QueryContext; import org.apache.lens.server.api.query.QueryEnded; import org.apache.lens.server.api.query.QueryExecutionService; import org.apache.lens.server.api.scheduler.SchedulerService; -import org.apache.lens.server.query.QueryExecutionServiceImpl; import org.apache.hadoop.conf.Configuration; @@ -59,8 +57,7 @@ public class TestSchedulerServiceImpl { SchedulerServiceImpl scheduler; EventServiceImpl eventService; - LensSessionHandle sessionHandle = null; - + String user = "someuser"; @BeforeMethod public void setup() throws Exception { System.setProperty(LensConfConstants.CONFIG_LOCATION, "target/test-classes/"); @@ -69,19 +66,20 @@ public class TestSchedulerServiceImpl { private void setupQueryService() throws Exception { QueryExecutionService queryExecutionService = PowerMockito.mock(QueryExecutionService.class); scheduler.setQueryService(queryExecutionService); + PowerMockito.when( + scheduler.getQueryService().estimate(anyString(), any(LensSessionHandle.class), anyString(), any(LensConf.class))) + .thenReturn(null); PowerMockito.when(scheduler.getQueryService() - .estimate(anyString(), any(LensSessionHandle.class), anyString(), any(LensConf.class))).thenReturn(null); - PowerMockito.when(scheduler.getQueryService() - .executeAsync(any(LensSessionHandle.class), anyString(), any(LensConf.class), anyString())) - .thenReturn(new QueryHandle(UUID.randomUUID())); + .executeAsync(any(LensSessionHandle.class), anyString(), any(LensConf.class), anyString())) + .thenReturn(new QueryHandle(UUID.randomUUID())); PowerMockito.when(scheduler.getQueryService().cancelQuery(any(LensSessionHandle.class), any(QueryHandle.class))) - .thenReturn(true); + .thenReturn(true); scheduler.getSchedulerEventListener().setQueryService(queryExecutionService); } private QueryEnded mockQueryEnded(SchedulerJobInstanceHandle instanceHandle, QueryStatus.Status status) { QueryContext mockContext = PowerMockito.mock(QueryContext.class); - PowerMockito.when(mockContext.getDriverResultPath()).thenReturn("/tmp/query1/result"); + PowerMockito.when(mockContext.getResultSetPath()).thenReturn("/tmp/query1/result"); Configuration conf = new Configuration(); // set the instance handle conf.set("job_instance_key", instanceHandle.getHandleIdString()); @@ -101,8 +99,7 @@ public class TestSchedulerServiceImpl { scheduler = LensServices.get().getService(SchedulerService.NAME); eventService = LensServices.get().getService(EventServiceImpl.NAME); setupQueryService(); - sessionHandle = ((QueryExecutionServiceImpl) LensServices.get().getService(QueryExecutionService.NAME)) - .openSession("someuser", "test", new HashMap<String, String>(), false); + LensSessionHandle sessionHandle = scheduler.openSessionAsUser(user); long currentTime = System.currentTimeMillis(); XJob job = getTestJob("0/5 * * * * ?", currentTime, currentTime + 15000); SchedulerJobHandle jobHandle = scheduler.submitAndScheduleJob(sessionHandle, job); @@ -119,16 +116,18 @@ public class TestSchedulerServiceImpl { Thread.sleep(2000); // Check the instance value SchedulerJobInstanceInfo info = scheduler.getSchedulerDAO() - .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId()); + .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId()); Assert.assertEquals(info.getInstanceRunList().size(), 1); Assert.assertEquals(info.getInstanceRunList().get(0).getResultPath(), "/tmp/query1/result"); Assert.assertEquals(info.getInstanceRunList().get(0).getInstanceState(), SchedulerJobInstanceState.SUCCEEDED); + scheduler.closeSession(sessionHandle); } @Test(priority = 2) public void testSuspendResume() throws Exception { long currentTime = System.currentTimeMillis(); XJob job = getTestJob("0/10 * * * * ?", currentTime, currentTime + 180000); + LensSessionHandle sessionHandle = scheduler.openSessionAsUser(user); SchedulerJobHandle jobHandle = scheduler.submitAndScheduleJob(sessionHandle, job); Assert.assertNotNull(jobHandle); Assert.assertTrue(scheduler.suspendJob(sessionHandle, jobHandle)); @@ -138,6 +137,7 @@ public class TestSchedulerServiceImpl { Thread.sleep(10000); Assert.assertTrue(scheduler.expireJob(sessionHandle, jobHandle)); Assert.assertEquals(scheduler.getSchedulerDAO().getJobState(jobHandle), SchedulerJobState.EXPIRED); + scheduler.closeSession(sessionHandle); } @Test(priority = 2) @@ -145,6 +145,7 @@ public class TestSchedulerServiceImpl { long currentTime = System.currentTimeMillis(); XJob job = getTestJob("0/10 * * * * ?", currentTime, currentTime + 180000); + LensSessionHandle sessionHandle = scheduler.openSessionAsUser(user); SchedulerJobHandle jobHandle = scheduler.submitAndScheduleJob(sessionHandle, job); // Wait for some instances. Thread.sleep(15000); @@ -153,7 +154,7 @@ public class TestSchedulerServiceImpl { eventService.notifyEvent(mockQueryEnded(instanceHandleList.get(0).getId(), QueryStatus.Status.FAILED)); Thread.sleep(1000); SchedulerJobInstanceInfo info = scheduler.getSchedulerDAO() - .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId()); + .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId()); // First run Assert.assertEquals(info.getInstanceRunList().size(), 1); Assert.assertEquals(info.getInstanceRunList().get(0).getInstanceState(), SchedulerJobInstanceState.FAILED); @@ -170,13 +171,14 @@ public class TestSchedulerServiceImpl { Assert.assertEquals(info.getInstanceRunList().get(1).getInstanceState(), SchedulerJobInstanceState.SUCCEEDED); Assert.assertTrue(scheduler.expireJob(sessionHandle, jobHandle)); Assert.assertEquals(scheduler.getSchedulerDAO().getJobState(jobHandle), SchedulerJobState.EXPIRED); + scheduler.closeSession(sessionHandle); } @Test(priority = 2) public void testKillRunningInstance() throws Exception { long currentTime = System.currentTimeMillis(); - XJob job = getTestJob("0/5 * * * * ?", currentTime, currentTime + 180000); + LensSessionHandle sessionHandle = scheduler.openSessionAsUser(user); SchedulerJobHandle jobHandle = scheduler.submitAndScheduleJob(sessionHandle, job); // Let it run Thread.sleep(6000); @@ -184,7 +186,7 @@ public class TestSchedulerServiceImpl { Assert.assertTrue(scheduler.killInstance(sessionHandle, instanceHandleList.get(0).getId())); Thread.sleep(2000); SchedulerJobInstanceInfo info = scheduler.getSchedulerDAO() - .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId()); + .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId()); Assert.assertEquals(info.getInstanceRunList().size(), 1); Assert.assertEquals(info.getInstanceRunList().get(0).getInstanceState(), SchedulerJobInstanceState.RUNNING); // Query End event @@ -194,6 +196,7 @@ public class TestSchedulerServiceImpl { Assert.assertEquals(info.getInstanceRunList().get(0).getInstanceState(), SchedulerJobInstanceState.KILLED); Assert.assertTrue(scheduler.expireJob(sessionHandle, jobHandle)); Assert.assertEquals(scheduler.getSchedulerDAO().getJobState(jobHandle), SchedulerJobState.EXPIRED); + scheduler.closeSession(sessionHandle); } private XTrigger getTestTrigger(String cron) { @@ -211,7 +214,7 @@ public class TestSchedulerServiceImpl { query.setQuery("select ID from test_table"); execution.setQuery(query); XSessionType sessionType = new XSessionType(); - sessionType.setDb("test"); + sessionType.setDb("default"); execution.setSession(sessionType); return execution; }