[
https://issues.apache.org/jira/browse/SQOOP-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14196470#comment-14196470
]
ASF subversion and git services commented on SQOOP-1510:
--------------------------------------------------------
Commit fedc12a65e74580f226722914d6b793129bca6ce in sqoop's branch
refs/heads/branch-1.99.4 from [~abec]
[ https://git-wip-us.apache.org/repos/asf?p=sqoop.git;h=fedc12a ]
SQOOP-1510: Sqoop2: Refactor JobRequestHandler for submit/abort job and
SubmissionHandler for get operation only
(Veena Basavaraj via Abraham Elmahrek)
> Sqoop2: Refactor JobRequestHandler for submit/abort job and SubmissionHandler
> for get operation only
> ----------------------------------------------------------------------------------------------------
>
> Key: SQOOP-1510
> URL: https://issues.apache.org/jira/browse/SQOOP-1510
> Project: Sqoop
> Issue Type: Sub-task
> Reporter: Veena Basavaraj
> Assignee: Veena Basavaraj
> Attachments: SQOOP-1510.patch, SQOOP-1510.patch, SQOOP-1510.patch,
> SQOOP-1510.patch
>
>
> PROPOSAL JIST:
> 1. Moves the actions of job submit/ status of current running job and
> stopping currently running job to the JOB resource . A submit action creates
> a submission record. To be consistent with the v1/connectors and v1/links we
> also have v1/jobs to display all jobs abd v1/jobs? cname by connector
> Also proposing ( note this can be another RB if it is hard to review the
> current one )
> START-> renamed to SUBMIT
> STOP -> renamed to abort
> 2. Submission is a read only resource that can give all the submissions or
> submissions per job. Since it is collection resource it is uses
> v1/submissions to be consistent with v1/connectors. v1/links and v1/jobs.
> 3. Changes to the client/ shell to reflect the 1 and 2
> PROPOSAL DETAILS
> Adding more details to the ticket to explain how the Job/ Submission are
> structured in the code.
> In sqoop we will create a job giving it the FROM and the TO link ids
> Something like this ...
> {code}
> create job -fromLink 1 -toLink 2
> {code}
> A job in SQOOP is a representation of all the data required for the JOB we
> will submit to the execution engine. hence the job in sqoop will hold the
> FROM link and FROM connector and TO link and its corresponding TO connector
> details. It will hold all its corresponding config values to invoke the FROM
> connector code and the TO connector code as part of the job lifecycle ( init,
> partitions, load, extract, destroy) steps.
> So once the job is created, we can perform these 4 actions on it
> 1. disable / re-enable it
> 2. submit the job to the excution engine
> 3. at any point while it is running, abort it ( we can also call this stop if
> we want to)
> All of the below is handled by the
> {code}JobManager{code} is an internal class that receives the requests for
> doing operations on the job resource.
> So what does submit really do?
> 1. create a job request for the execution engine. This is a UBER object that
> holds all the information I just spoke about earlier, like FROM connector and
> TO connector details. It also holds a reference to the submission object.
> Submission object holds the results of the submit action. A new submission
> record is persisted into the SQOOP repository every time we call a job
> submit. This is representated as {code} MSubmission {code} internally.
> {code}
> // Bootstrap job to execute in the configured execution engine
> prepareJob(jobRequest);
> {code}
> 2. We then call the submissionEngine api to submit, that will inturn choose
> the configured execution engine ( such as MR or spark in future) and then
> submit to it.
> {code}
> boolean success = submissionEngine.submit(jobRequest);
> {code}
> {code}
> / If we're in local mode than wait on completion. Local job runner do not
> // seems to be exposing API to get previously submitted job which makes
> // other methods of the submission engine quite useless.
> if(isLocal()) {
> job.waitForCompletion(true);
> } else {
> job.submit();
> }
> {code}
> For consistency we call the submission and execution apis as submit since MR
> engine and spark also use submit
> 3. Once we succeed we persist the record in the Sqoop repo.
> {code}
>
> RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
> {code}
> 4. If execution engine failed, we call some clean up code, but still persist
> the submission record in repo to record the "submit" action invoked from the
> client/ rest api
> {code}
> public MSubmission submit(long jobId, HttpEventContext ctx) {
> MSubmission mJobSubmission = createJobSubmission(ctx, jobId);
> JobRequest jobRequest = createJobRequest(jobId, mJobSubmission);
> // Bootstrap job to execute in the configured execution engine
> prepareJob(jobRequest);
> // Make sure that this job id is not currently running and submit the job
> // only if it's not.
> synchronized (getClass()) {
> MSubmission lastSubmission =
> RepositoryManager.getInstance().getRepository()
> .findLastSubmissionForJob(jobId);
> if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
> throw new SqoopException(DriverError.DRIVER_0002, "Job with id " +
> jobId);
> }
> // TODO(jarcec): We might need to catch all exceptions here to ensure
> // that Destroyer will be executed in all cases.
>
> // NOTE: the following is a blocking call
> boolean success = submissionEngine.submit(jobRequest);
> if (!success) {
> cleanUpOnJobSubmissionFailure(jobRequest);
> mJobSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
> }
> // persist submisison record to repository and success status
>
> RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
> }
> return mJobSubmission;
> }
> {code}
> Next, What happens if we want to stop/abort the running job.
> 1. The stop action is a nooperation if there is no running job currently. We
> have an api that tells the last job submitted and we use it to check if the a
> requested job is in running state.
> {code}
> // remove the last running job submission
> MSubmission mSubmission = repository.findLastSubmissionForJob(jobId);
> if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
> throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
> + " is not running hence cannot stop");
> }
>
> {code}
> 2. If a job was running, we call the submission engine api to stop which then
> calls the corresponding execution engine apis "kill Job"
> {code} submissionEngine.stop(mSubmission.getExternalId()); {code} For
> consistency we call all the apis as stop /abort
> {code}RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
> if(runningJob == null) {
> return;
> }
> runningJob.killJob(); {code}
> 3. Finally we update the status of the submission in the repo by calling
> {code}updateSubmission{code}
> So in summary the actions on JOB will be reflected on the job resource
> The job will have enable/ disable/ submit / abort(stop) Also get on the
> status and the job details
> Submission is a read only resource which represents the side affect of
> submitting a job to the sqoop execution engine
> While we change the rest apis to support the above, we also have to make the
> corresponsing changes in the sqoop client.
> Here is the details of the sqoop client. {code}SqoopClient{code} will support
> the submit/ abort(stop) a job
> 1. The client has limited visibility into the job submission. After a job
> submit command is issued it can monitor the status of the job that is
> submitted.
> Hence we have this enum JobSubmissionStatus, at any point we will reply back
> with one of the following 3 states. Started means it just got kicked off.
> Updated mean
> {code}
> /**
> * Status flags used when updating the job submission callback status
> */
> private enum JobSubmissionStatus {
> STARTED,
> UPDATED,
> FINISHED
> }
> {code}
> 2. The client will call the {code}SqoopResourceRequest {code}method to make a
> rest call for the "submit" action and get back a submission record. It can
> use this submission record to query the status if need be and get a more
> real-time feedback to client on the status of the job using submisison
> callback logic.
> {code}
> MSubmission submission =
> resourceRequests.submitJob(jobId).getSubmissions().get(0);
> {code}
> {code}
> public MSubmission submitJob(long jobId, SubmissionCallback callback, long
> pollTime)
> throws InterruptedException {
> if(pollTime <= 0) {
> throw new SqoopException(ClientError.CLIENT_0002);
> }
> boolean started = true;
> MSubmission submission =
> resourceRequests.submitJob(jobId).getSubmissions().get(0);
> while(submission.getStatus().isRunning()) {
> if(started) {
> submissionCallback(callback, submission, JobSubmissionStatus.STARTED);
> started = false;
> } else {
> submissionCallback(callback, submission, JobSubmissionStatus.UPDATED);
> }
> Thread.sleep(pollTime);
> submission = getJobStatus(jobId);
> }
> submissionCallback(callback, submission, JobSubmissionStatus.FINISHED);
> return submission;
> }
> {code}
> 3. Similarly for the stop action it calls the JobResoource to make a rest
> call to the server for stopping the job
> {code}
> public MSubmission stopJob(long jid) {
> return resourceRequests.stopJob(jid).getSubmissions().get(0);
> }
> {code}
> Why is the action called SUBMIT/ instead of START?
> SUBMIT is a common term in both MR and SPARK execution engine, Also as a side
> affect of SUBMIT we create a SUBMISSION record and store it in the repository.
> There are tons of places in the code and in the java docs, we actually mean
> submit when we say start in the current code. So why not just call it submit.
> DRIVER_0008("Invalid combination of submission and execution engines"),
> DRIVER_0009("Job has been disabled. Cannot submit this job."),
> DRIVER_0010("Link for this job has been disabled. Cannot submit this job."),
> DRIVER_0011("Connector does not support specified direction. Cannot submit
> this job."),
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)