jerryshao commented on code in PR #7772:
URL: https://github.com/apache/gravitino/pull/7772#discussion_r2227322845
##########
core/src/main/java/org/apache/gravitino/job/JobManager.java:
##########
@@ -226,8 +267,243 @@ public JobEntity getJob(String metalake, String jobId)
throws NoSuchJobException
});
}
+ @Override
+ public JobEntity runJob(String metalake, String jobTemplateName, Map<String,
String> jobConf)
+ throws NoSuchJobTemplateException {
+ checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+ // Check if the job template exists, will throw NoSuchJobTemplateException
if it does not exist.
+ JobTemplateEntity jobTemplateEntity = getJobTemplate(metalake,
jobTemplateName);
+
+ // Create staging directory.
+ // TODO(jerry). The job staging directory will be deleted using a
background thread.
+ long jobId = idGenerator.nextId();
+ File jobStagingDir = new File(stagingDir, JobHandle.JOB_ID_PREFIX + jobId);
+ if (!jobStagingDir.mkdirs()) {
+ throw new RuntimeException(
+ String.format("Failed to create staging directory %s for job %s",
jobStagingDir, jobId));
+ }
+
+ // Create a JobTemplate by replacing the template parameters with the
jobConf values, and
+ // also downloading any necessary files from the URIs specified in the job
template.
+ JobTemplate jobTemplate = createRuntimeJobTemplate(jobTemplateEntity,
jobConf, jobStagingDir);
+
+ // Submit the job template to the job executor
+ String jobExecutionId;
+ try {
+ jobExecutionId = jobExecutor.submitJob(jobTemplate);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to submit job template %s for execution",
jobTemplate), e);
+ }
+
+ // Create a new JobEntity to represent the job
+ JobEntity jobEntity =
+ JobEntity.builder()
+ .withId(jobId)
+ .withJobExecutionId(jobExecutionId)
+ .withJobTemplateName(jobTemplateName)
+ .withStatus(JobHandle.Status.QUEUED)
+ .withNamespace(NamespaceUtil.ofJob(metalake))
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+
+ try {
+ entityStore.put(jobEntity, false /* overwrite */);
Review Comment:
This will be used in job listing by job template.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]