[ 
https://issues.apache.org/jira/browse/SQOOP-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14195797#comment-14195797
 ] 

Hudson commented on SQOOP-1510:
-------------------------------

ABORTED: Integrated in Sqoop2-hadoop200 #578 (See 
[https://builds.apache.org/job/Sqoop2-hadoop200/578/])
SQOOP-1510: Sqoop2: Refactor JobRequestHandler for submit/abort job and 
SubmissionHandler for get operation only (abraham: 
https://git-wip-us.apache.org/repos/asf?p=sqoop.git&a=commit&h=268a47552f5758c2d9caa5e859bd3a631baaedc9)
* server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java
* common/src/main/java/org/apache/sqoop/json/JsonBean.java
* common/src/main/java/org/apache/sqoop/json/JobBean.java
* shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java
* client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java
* common/src/main/java/org/apache/sqoop/model/MSubmission.java
* 
repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java
* server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
* core/src/main/java/org/apache/sqoop/driver/DriverError.java
* common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java
* shell/src/main/java/org/apache/sqoop/shell/StartJobFunction.java
* server/src/main/webapp/WEB-INF/web.xml
* tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java
* core/src/main/java/org/apache/sqoop/driver/JobManager.java
* core/src/main/java/org/apache/sqoop/repository/Repository.java
* server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java
* 
test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
* common/src/test/java/org/apache/sqoop/json/TestJobBean.java
* server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
* shell/src/main/java/org/apache/sqoop/shell/StopJobFunction.java
* client/src/main/java/org/apache/sqoop/client/SqoopClient.java
* core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
* server/src/main/java/org/apache/sqoop/server/v1/SubmissionsServlet.java
* core/src/main/java/org/apache/sqoop/driver/JobRequest.java
* shell/src/main/java/org/apache/sqoop/shell/ShowJobStatusFunction.java
* shell/src/main/java/org/apache/sqoop/shell/StatusJobFunction.java
* shell/src/main/java/org/apache/sqoop/shell/StatusCommand.java
* server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java
* common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
* common/src/main/java/org/apache/sqoop/json/JobsBean.java
* core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
* 
client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java
* client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
* 
repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
* test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
* common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
* shell/src/main/java/org/apache/sqoop/shell/UpdateLinkFunction.java
* 
client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java


> Sqoop2: Refactor JobRequestHandler for submit/abort job and SubmissionHandler 
> for get operation only
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SQOOP-1510
>                 URL: https://issues.apache.org/jira/browse/SQOOP-1510
>             Project: Sqoop
>          Issue Type: Sub-task
>            Reporter: Veena Basavaraj
>            Assignee: Veena Basavaraj
>         Attachments: SQOOP-1510.patch, SQOOP-1510.patch, SQOOP-1510.patch, 
> SQOOP-1510.patch
>
>
> PROPOSAL JIST:
> 1. Moves the actions of job submit/ status of current running job and 
> stopping currently running job to the JOB resource . A submit action creates 
> a submission record. To be consistent with the v1/connectors and v1/links we 
> also have v1/jobs to display all jobs abd v1/jobs? cname by connector
> Also proposing ( note this can be another RB if it is hard to review the 
> current one )
>  START-> renamed to SUBMIT 
>  STOP -> renamed to abort 
> 2. Submission is a read only resource that can give all the submissions or 
> submissions per job. Since it is collection resource it is uses 
> v1/submissions to be consistent with v1/connectors. v1/links and v1/jobs.
> 3. Changes to the client/ shell to reflect the 1 and 2
> PROPOSAL  DETAILS
> Adding more details to the ticket to explain how the Job/ Submission are 
> structured in the code.
> In sqoop we will create a job  giving it the FROM and the TO link ids
> Something like this ...
> {code}
> create job -fromLink 1 -toLink 2
> {code}
> A job in SQOOP is a representation of all the data required for the JOB we 
> will submit to the execution engine. hence the job in sqoop will hold the 
> FROM link and FROM connector  and TO link and  its corresponding TO connector 
> details. It will hold all its corresponding config values to invoke the FROM 
> connector code and the TO connector code as part of the job lifecycle ( init, 
> partitions, load, extract, destroy) steps. 
> So once the job is created, we can perform these 4 actions on it
> 1. disable / re-enable it 
> 2. submit the job to the excution engine
> 3. at any point while it is running, abort it ( we can also call this stop if 
> we want to)
> All of the below is handled by the 
> {code}JobManager{code} is an internal class that receives the requests for 
> doing operations on the job resource.
> So what does submit really do?
> 1. create a job request for the execution engine. This is a UBER object that 
> holds all the information I just spoke about earlier, like FROM connector and 
> TO connector details. It also holds a reference to the submission object.  
> Submission object holds the results of the submit action. A new submission 
> record is persisted into the SQOOP repository every time we call a job 
> submit. This is representated as {code} MSubmission {code} internally.  
> {code}
>     // Bootstrap job to execute in the configured execution engine
>     prepareJob(jobRequest);
> {code}
> 2. We then call the submissionEngine api to submit,  that will inturn  choose 
> the configured execution engine ( such as MR or spark in future) and then 
> submit to it. 
> {code}
>       boolean success = submissionEngine.submit(jobRequest);
> {code}
> {code}
> / If we're in local mode than wait on completion. Local job runner do not
>       // seems to be exposing API to get previously submitted job which makes
>       // other methods of the submission engine quite useless.
>       if(isLocal()) {
>         job.waitForCompletion(true);
>       } else {
>         job.submit();
>       }
> {code}
> For consistency we call the submission and execution apis as submit since MR 
> engine and spark also use submit
> 3. Once we succeed we persist the record in the Sqoop repo.
> {code}
>       
> RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
> {code}
> 4. If execution engine failed, we call some clean up code, but still persist 
> the submission record in repo to record the "submit" action invoked from the 
> client/ rest api
> {code}
>  public MSubmission submit(long jobId, HttpEventContext ctx) {
>     MSubmission mJobSubmission = createJobSubmission(ctx, jobId);
>     JobRequest jobRequest = createJobRequest(jobId, mJobSubmission);
>     // Bootstrap job to execute in the configured execution engine
>     prepareJob(jobRequest);
>     // Make sure that this job id is not currently running and submit the job
>     // only if it's not.
>     synchronized (getClass()) {
>       MSubmission lastSubmission = 
> RepositoryManager.getInstance().getRepository()
>           .findLastSubmissionForJob(jobId);
>       if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
>         throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + 
> jobId);
>       }
>       // TODO(jarcec): We might need to catch all exceptions here to ensure
>       // that Destroyer will be executed in all cases.
>       
>       // NOTE: the following is a blocking call
>       boolean success = submissionEngine.submit(jobRequest);
>       if (!success) {
>         cleanUpOnJobSubmissionFailure(jobRequest);
>         mJobSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
>       }
>       // persist submisison record to repository and success status
>       
> RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
>     }
>     return mJobSubmission;
>   }
> {code}
> Next, What happens if we want to stop/abort the running job. 
> 1. The stop action is a nooperation if there is no running job currently. We 
> have an api that tells the last job submitted and we use it to check if the a 
> requested job is in running state.
> {code}
>  // remove the last running job submission
>     MSubmission mSubmission = repository.findLastSubmissionForJob(jobId);
>     if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
>       throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
>           + " is not running hence cannot stop");
>     }
>    
> {code}
> 2. If a job was running, we call the submission engine api to stop which then 
> calls the corresponding execution engine apis "kill Job"
> {code}     submissionEngine.stop(mSubmission.getExternalId()); {code} For 
> consistency we call all the apis as stop /abort
> {code}RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
>       if(runningJob == null) {
>         return;
>       }
>       runningJob.killJob(); {code}
> 3.  Finally we update the status of the submission in the repo by calling 
> {code}updateSubmission{code}
> So in summary the actions on JOB will be reflected on the job resource
> The job will have enable/ disable/ submit / abort(stop) Also get on the 
> status and the job details
> Submission is a read only resource which represents the side affect of 
> submitting a job to the sqoop execution engine
> While we change the rest apis to support the above, we also have to make the 
> corresponsing changes in the sqoop client.
> Here is the details of the sqoop client. {code}SqoopClient{code} will support 
> the submit/ abort(stop) a job
> 1. The client has limited visibility into the job submission.  After a job 
> submit command is issued it can monitor the status of the job that is 
> submitted. 
> Hence we have this enum JobSubmissionStatus, at any point we will reply back 
> with one of the following 3 states. Started means it just got kicked off. 
> Updated mean 
> {code}
>   /**
>    * Status flags used when updating the job submission callback status
>    */
>   private enum JobSubmissionStatus {
>     STARTED,
>     UPDATED,
>     FINISHED
>   }
> {code}
> 2. The client will call the {code}SqoopResourceRequest {code}method to make a 
> rest call for the "submit" action and get back a submission record. It can 
> use this submission record to query the status if need be and get a more 
> real-time feedback to client on the status of the job using submisison 
> callback logic.
> {code}
>     MSubmission submission = 
> resourceRequests.submitJob(jobId).getSubmissions().get(0);
> {code}
> {code}
>   public MSubmission submitJob(long jobId, SubmissionCallback callback, long 
> pollTime)
>       throws InterruptedException {
>     if(pollTime <= 0) {
>       throw new SqoopException(ClientError.CLIENT_0002);
>     }
>     boolean started = true;
>     MSubmission submission = 
> resourceRequests.submitJob(jobId).getSubmissions().get(0);
>     while(submission.getStatus().isRunning()) {
>       if(started) {
>         submissionCallback(callback, submission, JobSubmissionStatus.STARTED);
>         started = false;
>       } else {
>         submissionCallback(callback, submission, JobSubmissionStatus.UPDATED);
>       }
>       Thread.sleep(pollTime);
>       submission = getJobStatus(jobId);
>     }
>     submissionCallback(callback, submission, JobSubmissionStatus.FINISHED);
>     return submission;
>   }
> {code}
> 3. Similarly for the stop action it calls the JobResoource to make a rest 
> call to the server for stopping the job
> {code}
>   public MSubmission stopJob(long jid) {
>     return resourceRequests.stopJob(jid).getSubmissions().get(0);
>   }
> {code}
> Why is the action called SUBMIT/ instead of START?
> SUBMIT is a common term in both MR and SPARK execution engine, Also as a side 
> affect of SUBMIT we create a SUBMISSION record and store it in the repository.
> There are tons of places in the code and in the java docs, we actually mean 
> submit when we say start in the current code. So why not just call it submit.
>   DRIVER_0008("Invalid combination of submission and execution engines"),
>   DRIVER_0009("Job has been disabled. Cannot submit this job."),
>   DRIVER_0010("Link for this job has been disabled. Cannot submit this job."),
>   DRIVER_0011("Connector does not support specified direction. Cannot submit 
> this job."),



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to