jerryshao commented on code in PR #7772:
URL: https://github.com/apache/gravitino/pull/7772#discussion_r2221342124
##########
core/src/main/java/org/apache/gravitino/job/JobManager.java:
##########
@@ -226,8 +267,243 @@ public JobEntity getJob(String metalake, String jobId)
throws NoSuchJobException
});
}
+ @Override
+ public JobEntity runJob(String metalake, String jobTemplateName, Map<String,
String> jobConf)
+ throws NoSuchJobTemplateException {
+ checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+ // Check if the job template exists, will throw NoSuchJobTemplateException
if it does not exist.
+ JobTemplateEntity jobTemplateEntity = getJobTemplate(metalake,
jobTemplateName);
+
+ // Create staging directory.
+ // TODO(jerry). The job staging directory will be deleted using a
background thread.
+ long jobId = idGenerator.nextId();
+ File jobStagingDir = new File(stagingDir, JobHandle.JOB_ID_PREFIX + jobId);
+ if (!jobStagingDir.mkdirs()) {
+ throw new RuntimeException(
+ String.format("Failed to create staging directory %s for job %s",
jobStagingDir, jobId));
+ }
+
+ // Create a JobTemplate by replacing the template parameters with the
jobConf values, and
+ // also downloading any necessary files from the URIs specified in the job
template.
+ JobTemplate jobTemplate = createRuntimeJobTemplate(jobTemplateEntity,
jobConf, jobStagingDir);
+
+ // Submit the job template to the job executor
+ String jobExecutionId;
+ try {
+ jobExecutionId = jobExecutor.submitJob(jobTemplate);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to submit job template %s for execution",
jobTemplate), e);
+ }
+
+ // Create a new JobEntity to represent the job
+ JobEntity jobEntity =
+ JobEntity.builder()
+ .withId(jobId)
+ .withJobExecutionId(jobExecutionId)
+ .withJobTemplateName(jobTemplateName)
+ .withStatus(JobHandle.Status.QUEUED)
+ .withNamespace(NamespaceUtil.ofJob(metalake))
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+
+ try {
+ entityStore.put(jobEntity, false /* overwrite */);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to register the job entity " +
jobEntity, e);
Review Comment:
I would leave as it is. if the entity store is failed, then it may have some
serious issues, trying to keep the consistency in such case seems worthless and
makes things more complicated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]