Repository: lens Updated Branches: refs/heads/master 4bcb7aa96 -> 265c4667a
LENS-1184 : Add REST API for the scheduler Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/265c4667 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/265c4667 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/265c4667 Branch: refs/heads/master Commit: 265c4667af0dd7850892d0180c9ee72bb40b3e83 Parents: 4bcb7aa Author: Ajay Yadava <ajayyad...@apache.org> Authored: Tue Jul 5 10:16:07 2016 +0530 Committer: Amareshwari Sriramadasu <amareshw...@apache.org> Committed: Tue Jul 5 10:16:07 2016 +0530 ---------------------------------------------------------------------- .../server/api/scheduler/SchedulerService.java | 25 ++- .../lens/server/scheduler/LensScheduler.java | 48 +++++ .../lens/server/scheduler/ScheduleResource.java | 181 ++++++++++++++++++- .../server/scheduler/SchedulerServiceImpl.java | 73 ++++++-- 4 files changed, 288 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/265c4667/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java b/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java index 700a255..d0af876 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java @@ -19,18 +19,22 @@ package org.apache.lens.server.api.scheduler; import java.util.Collection; -import java.util.Date; import java.util.List; import org.apache.lens.api.LensSessionHandle; import org.apache.lens.api.scheduler.*; +import org.apache.lens.server.api.LensService; +import org.apache.lens.server.api.SessionValidator; import org.apache.lens.server.api.error.LensException; /** * Scheduler interface. */ -public interface SchedulerService { +public interface SchedulerService extends LensService, SessionValidator { + + /** The constant NAME */ + String NAME = "scheduler"; /** * Submit a job. @@ -104,11 +108,9 @@ public interface SchedulerService { * * @param sessionHandle handle for the current session. * @param jobHandle handle for the job - * @param expiryTime time after which the job shouldn't execute. * @throws LensException the lens exception */ - void expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, - Date expiryTime) throws LensException; + void expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; /** @@ -131,12 +133,10 @@ public interface SchedulerService { * * @param sessionHandle handle for the session. * @param jobHandle handle for the job - * @param effectiveTime time from which to resume the instances. * @return true if the job was resumed successfully, false otherwise. * @throws LensException the lens exception */ - boolean resumeJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, - Date effectiveTime) throws LensException; + boolean resumeJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; /** * Delete a job. @@ -155,16 +155,13 @@ public interface SchedulerService { * if null no entries will be removed from result * @param user filter for user who submitted the job, if specified only jobs submitted by the given user * will be returned, if not specified no entries will be removed from result on basis of userName - * @param jobName filter for jobName, if specified only the jobs with name same as given name will be considered - * , else no jobs will be filtered out on the basis of name. * @param startTime if specified only instances with scheduleTime after this time will be considered. * @param endTime if specified only instances with scheduleTime before this time will be considered. * @return A collection of stats per job * @throws LensException */ Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle sessionHandle, - String state, String user, - String jobName, long startTime, long endTime) throws LensException; + String state, String user, long startTime, long endTime) throws LensException; /** * Returns stats for a job. @@ -187,10 +184,10 @@ public interface SchedulerService { * @param sessionHandle handle for the session. * @param jobHandle handle for the job * @param numResults - number of results to be returned, default 100. - * @return list of instance ids for the job + * @return list of instances for the job * @throws LensException the lens exception */ - List<String> getJobInstances(LensSessionHandle sessionHandle, + List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, Long numResults) throws LensException; /** http://git-wip-us.apache.org/repos/asf/lens/blob/265c4667/lens-server/src/main/java/org/apache/lens/server/scheduler/LensScheduler.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/LensScheduler.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/LensScheduler.java new file mode 100644 index 0000000..d624921 --- /dev/null +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/LensScheduler.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lens.server.scheduler; + +import org.apache.lens.api.scheduler.SchedulerJobHandle; +import org.apache.lens.api.scheduler.XJob; + +/** + * Core work engine for the Lens Scheduler. + */ +public final class LensScheduler { + + private static final LensScheduler INSTANCE = new LensScheduler(); + + // private constructor to ensure single instance. + private LensScheduler(){} + + /** + * + * @return the singleton instance of the scheduler. + */ + public static LensScheduler get(){ + return INSTANCE; + } + + public void schedule(SchedulerJobHandle handle, XJob job) { + + } + + + +} http://git-wip-us.apache.org/repos/asf/lens/blob/265c4667/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java index abc4621..39c4d98 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java @@ -18,20 +18,189 @@ */ package org.apache.lens.server.scheduler; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; +import java.util.Collection; +import java.util.List; + +import javax.ws.rs.*; import javax.ws.rs.core.MediaType; +import org.apache.lens.api.APIResult; +import org.apache.lens.api.LensSessionHandle; +import org.apache.lens.api.scheduler.*; +import org.apache.lens.server.LensServices; +import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.scheduler.SchedulerService; + +import org.apache.commons.lang3.StringUtils; + /** - * The Class ScheduleResource. + * REST end point for all scheduler operations. */ -@Path("/queryscheduler") +@Path("scheduler") +@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) public class ScheduleResource { + public static enum INSTANCE_ACTIONS { + KILL, RERUN; + + public static INSTANCE_ACTIONS fromString(String name) { + return valueOf(name.toUpperCase()); + } + } + + public static enum JOB_ACTIONS { + SCHEDULE, EXPIRE, SUSPEND, RESUME; + + public static JOB_ACTIONS fromString(String name) { + return valueOf(name.toUpperCase()); + } + } + + public static SchedulerService getSchedulerService() { + return LensServices.get().getService(SchedulerService.NAME); + } + + private static void validateSession(LensSessionHandle sessionHandle) throws LensException { + getSchedulerService().validateSession(sessionHandle); + } + @GET @Produces(MediaType.TEXT_PLAIN) public String getMessage() { - return "Hello World! from scheduler"; + return "Scheduler is running."; } + + @POST + @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) + @Path("jobs") + public SchedulerJobHandle submitJob(@QueryParam("sessionid") LensSessionHandle sessionId, + @DefaultValue("") @QueryParam("action") String action, + XJob job) throws LensException { + validateSession(sessionId); + if (StringUtils.isBlank(action)) { + return getSchedulerService().submitJob(sessionId, job); + } else if (StringUtils.equalsIgnoreCase(action, "submit-and-schedule")) { + return getSchedulerService().submitAndScheduleJob(sessionId, job); + } else { + throw new BadRequestException("Optional Query param 'action' can only be 'submit-and-schedule'"); + } + } + + @GET + @Path("jobs/stats") + public Collection<SchedulerJobStats> getAllJobStats(@QueryParam("sessionid") LensSessionHandle sessionId, + @DefaultValue("running") @QueryParam("state") String state, + @QueryParam("name") String jobName, + @DefaultValue("user") @QueryParam("user") String user, + @DefaultValue("-1") @QueryParam("start") long start, + @DefaultValue("-1") @QueryParam("end") long end) throws LensException { + return getSchedulerService().getAllJobStats(sessionId, state, user, start, end); + } + + @GET + @Path("jobs/{jobHandle}") + public XJob getJobDefinition(@QueryParam("sessionid") LensSessionHandle sessionId, + @PathParam("jobHandle") SchedulerJobHandle jobHandle) throws LensException { + + return getSchedulerService().getJobDefinition(sessionId, jobHandle); + } + + @DELETE + @Path("jobs/{jobHandle}") + public APIResult deleteJob(@QueryParam("sessionid") LensSessionHandle sessionId, + @QueryParam("jobHandle") SchedulerJobHandle jobHandle) throws LensException { + validateSession(sessionId); + getSchedulerService().deleteJob(sessionId, jobHandle); + return APIResult.success(); + } + + @PUT + @Path("jobs/{jobHandle}/") + @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) + public APIResult updateJob(@QueryParam("sessionid") LensSessionHandle sessionId, + @PathParam("jobHandle") SchedulerJobHandle jobHandle, XJob job) throws LensException { + validateSession(sessionId); + getSchedulerService().updateJob(sessionId, jobHandle, job); + return APIResult.success(); + } + + @POST + @Path("jobs/{jobHandle}") + public APIResult updateJob(@QueryParam("sessionid") LensSessionHandle sessionId, + @PathParam("jobHandle") SchedulerJobHandle jobHandle, + @DefaultValue("schedule") @QueryParam("action") JOB_ACTIONS action) throws LensException { + validateSession(sessionId); + switch (action) { + + case SCHEDULE: + getSchedulerService().scheduleJob(sessionId, jobHandle); + break; + + case EXPIRE: + getSchedulerService().expireJob(sessionId, jobHandle); + break; + + case SUSPEND: + getSchedulerService().suspendJob(sessionId, jobHandle); + break; + + case RESUME: + getSchedulerService().resumeJob(sessionId, jobHandle); + break; + + default: + throw new BadRequestException("Unsupported action " + action.toString()); + } + return APIResult.success(); + } + + @GET + @Path("jobs/{jobHandle}/stats") + public SchedulerJobInfo getJobDetails(@QueryParam("sessionid") LensSessionHandle sessionId, + @PathParam("jobHandle") SchedulerJobHandle jobHandle) throws LensException { + validateSession(sessionId); + return getSchedulerService().getJobDetails(sessionId, jobHandle); + } + + @GET + @Path("jobs/{jobHandle}/instances/") + public List<SchedulerJobInstanceInfo> getJobInstances(@QueryParam("sessionid") LensSessionHandle sessionId, + @PathParam("jobHandle") SchedulerJobHandle jobHandle, + @QueryParam("numResults") Long numResults) throws LensException { + validateSession(sessionId); + return getSchedulerService().getJobInstances(sessionId, jobHandle, numResults); + } + + @GET + @Path("instances/{instanceHandle}") + public SchedulerJobInstanceInfo getInstanceDetails(@QueryParam("sessionid") LensSessionHandle sessionId, + @PathParam("instanceHandle") + SchedulerJobInstanceHandle instanceHandle) throws LensException { + validateSession(sessionId); + return getSchedulerService().getInstanceDetails(sessionId, instanceHandle); + } + + @POST + @Path("instances/{instanceHandle}") + public APIResult updateInstance(@QueryParam("sessionid") LensSessionHandle sessionId, + @PathParam("instanceHandle") SchedulerJobInstanceHandle instanceHandle, + @QueryParam("action") INSTANCE_ACTIONS action) throws LensException { + validateSession(sessionId); + + switch (action) { + case KILL: + getSchedulerService().killInstance(sessionId, instanceHandle); + break; + + case RERUN: + getSchedulerService().rerunInstance(sessionId, instanceHandle); + break; + + default: + throw new BadRequestException("Unsupported action " + action.toString()); + + } + return APIResult.success(); + } + } http://git-wip-us.apache.org/repos/asf/lens/blob/265c4667/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java index 706d54a..3952671 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java @@ -19,34 +19,44 @@ package org.apache.lens.server.scheduler; import java.util.Collection; -import java.util.Date; import java.util.List; +import java.util.UUID; import org.apache.lens.api.LensSessionHandle; import org.apache.lens.api.scheduler.*; import org.apache.lens.server.BaseLensService; +import org.apache.lens.server.LensServerConf; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.health.HealthStatus; import org.apache.lens.server.api.scheduler.SchedulerService; +import org.apache.lens.server.session.LensSessionImpl; import org.apache.hive.service.cli.CLIService; + /** - * The Class QuerySchedulerService. + * This class handles all the scheduler operations. */ public class SchedulerServiceImpl extends BaseLensService implements SchedulerService { + // get the state store + private SchedulerDAO schedulerDAO; + + private LensScheduler scheduler; /** * The constant name for scheduler service. */ public static final String NAME = "scheduler"; - /** - * Instantiates a new scheduler service. - * - * @param cliService the cli service - */ - public SchedulerServiceImpl(CLIService cliService) { + public SchedulerServiceImpl(CLIService cliService) throws LensException { + super(NAME, cliService); + this.schedulerDAO = new SchedulerDAO(LensServerConf.getHiveConf()); + this.scheduler = LensScheduler.get(); + } + + public SchedulerServiceImpl(CLIService cliService, SchedulerDAO schedulerDAO) { super(NAME, cliService); + this.schedulerDAO = schedulerDAO; + this.scheduler = LensScheduler.get(); } /** @@ -64,6 +74,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public SchedulerJobHandle submitJob(LensSessionHandle sessionHandle, XJob job) throws LensException { + //TBD place holder code + LensSessionImpl session = getSession(sessionHandle); return null; } @@ -71,12 +83,17 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * {@inheritDoc} */ @Override - public void scheduleJob(LensSessionHandle sessionHandle, - SchedulerJobHandle jobHandle) throws LensException { + public void scheduleJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + //TBD place holder code + // send the schedule request to the scheduler. + UUID externalID = jobHandle.getHandleId(); + // get the job from the database and schedule } @Override public SchedulerJobHandle submitAndScheduleJob(LensSessionHandle sessionHandle, XJob job) throws LensException { + //TBD place holder code + // take job, validate it, submit it(check duplicate, persist it), schedule it. return null; } @@ -85,6 +102,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public XJob getJobDefinition(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + //TBD place holder code + // get the job definition from the persisted store, return it. return null; } @@ -94,6 +113,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe @Override public SchedulerJobInfo getJobDetails(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + //TBD place holder code return null; } @@ -103,16 +123,18 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe @Override public boolean updateJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, XJob newJobDefinition) throws LensException { + //TBD place holder code + XJob job = schedulerDAO.getJob(jobHandle); return false; } /** + * * {@inheritDoc} */ @Override - public void expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, - Date expiryTime) throws LensException { - + public void expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + //TBD place holder code } /** @@ -120,6 +142,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public boolean suspendJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + //TBD place holder code return false; } @@ -127,8 +150,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * {@inheritDoc} */ @Override - public boolean resumeJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, - Date effectiveTime) throws LensException { + public boolean resumeJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + // TBD place holder code return false; } @@ -137,6 +160,9 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public boolean deleteJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + // TBD place holder code + // it should only be a soft delete. Later on we will make a purge service and that service will delete + // all the soft delete things. return false; } @@ -144,8 +170,10 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * {@inheritDoc} */ @Override - public Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle sessionHandle, String state, String user, - String jobName, long startTime, long endTime) throws LensException { + public Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle sessionHandle, String state, String userName, + long startTime, long endTime) throws LensException { + // TBD place holder code + // validate that the state is a valid state (enum) return null; } @@ -155,6 +183,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe @Override public SchedulerJobStats getJobStats(LensSessionHandle sessionHandle, SchedulerJobHandle handle, String state, long startTime, long endTime) throws LensException { + // TBD place holder code + // validate that the state is a valid state (enum) return null; } @@ -164,6 +194,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe @Override public boolean rerunInstance(LensSessionHandle sessionHandle, SchedulerJobInstanceHandle instanceHandle) throws LensException { + //TBD place holder code return false; } @@ -171,15 +202,18 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * {@inheritDoc} */ @Override - public List<String> getJobInstances(LensSessionHandle sessionHandle, + public List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, Long numResults) throws LensException { + // TBD place holder code + // By default return 100 results - make it configurable return null; } @Override public boolean killInstance(LensSessionHandle sessionHandle, SchedulerJobInstanceHandle instanceHandle) throws LensException { - return false; + // TBD place holder code + return true; } @@ -189,6 +223,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe @Override public SchedulerJobInstanceInfo getInstanceDetails(LensSessionHandle sessionHandle, SchedulerJobInstanceHandle instanceHandle) throws LensException { + // TBD place holder code return null; }