mchades commented on code in PR #7772:
URL: https://github.com/apache/gravitino/pull/7772#discussion_r2225611609


##########
core/src/main/java/org/apache/gravitino/job/JobManager.java:
##########
@@ -226,8 +267,243 @@ public JobEntity getJob(String metalake, String jobId) 
throws NoSuchJobException
         });
   }
 
+  @Override
+  public JobEntity runJob(String metalake, String jobTemplateName, Map<String, 
String> jobConf)
+      throws NoSuchJobTemplateException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    // Check if the job template exists, will throw NoSuchJobTemplateException 
if it does not exist.
+    JobTemplateEntity jobTemplateEntity = getJobTemplate(metalake, 
jobTemplateName);
+
+    // Create staging directory.
+    // TODO(jerry). The job staging directory will be deleted using a 
background thread.
+    long jobId = idGenerator.nextId();
+    File jobStagingDir = new File(stagingDir, JobHandle.JOB_ID_PREFIX + jobId);

Review Comment:
   Should the metalake directory be added to the path?  As mentioned in 
https://github.com/apache/gravitino/pull/7695#discussion_r2218125427



##########
core/src/main/java/org/apache/gravitino/job/JobManager.java:
##########
@@ -226,8 +267,243 @@ public JobEntity getJob(String metalake, String jobId) 
throws NoSuchJobException
         });
   }
 
+  @Override
+  public JobEntity runJob(String metalake, String jobTemplateName, Map<String, 
String> jobConf)
+      throws NoSuchJobTemplateException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    // Check if the job template exists, will throw NoSuchJobTemplateException 
if it does not exist.
+    JobTemplateEntity jobTemplateEntity = getJobTemplate(metalake, 
jobTemplateName);
+
+    // Create staging directory.
+    // TODO(jerry). The job staging directory will be deleted using a 
background thread.
+    long jobId = idGenerator.nextId();
+    File jobStagingDir = new File(stagingDir, JobHandle.JOB_ID_PREFIX + jobId);
+    if (!jobStagingDir.mkdirs()) {
+      throw new RuntimeException(
+          String.format("Failed to create staging directory %s for job %s", 
jobStagingDir, jobId));
+    }
+
+    // Create a JobTemplate by replacing the template parameters with the 
jobConf values, and
+    // also downloading any necessary files from the URIs specified in the job 
template.
+    JobTemplate jobTemplate = createRuntimeJobTemplate(jobTemplateEntity, 
jobConf, jobStagingDir);
+
+    // Submit the job template to the job executor
+    String jobExecutionId;
+    try {
+      jobExecutionId = jobExecutor.submitJob(jobTemplate);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Failed to submit job template %s for execution", 
jobTemplate), e);
+    }
+
+    // Create a new JobEntity to represent the job
+    JobEntity jobEntity =
+        JobEntity.builder()
+            .withId(jobId)
+            .withJobExecutionId(jobExecutionId)
+            .withJobTemplateName(jobTemplateName)
+            .withStatus(JobHandle.Status.QUEUED)
+            .withNamespace(NamespaceUtil.ofJob(metalake))
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+
+    try {
+      entityStore.put(jobEntity, false /* overwrite */);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to register the job entity " + 
jobEntity, e);
+    }
+
+    return jobEntity;
+  }
+
+  @Override
+  public JobEntity cancelJob(String metalake, String jobId) throws 
NoSuchJobException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    // Retrieve the job entity, will throw NoSuchJobException if the job does 
not exist.
+    JobEntity jobEntity = getJob(metalake, jobId);
+
+    if (jobEntity.status() == JobHandle.Status.CANCELLED
+        || jobEntity.status() == JobHandle.Status.SUCCEEDED
+        || jobEntity.status() == JobHandle.Status.FAILED) {
+      // If the job is already cancelled, succeeded, or failed, we do not need 
to cancel it again.
+      return jobEntity;
+    }
+
+    // Cancel the job using the job executor
+    try {
+      jobExecutor.cancelJob(jobEntity.jobExecutionId());
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Failed to cancel job with ID %s under metalake %s", 
jobId, metalake), e);
+    }
+
+    // Update the job status to CANCELING
+    JobEntity newJobEntity =
+        JobEntity.builder()
+            .withId(jobEntity.id())
+            .withJobExecutionId(jobEntity.jobExecutionId())
+            .withJobTemplateName(jobEntity.jobTemplateName())
+            .withStatus(JobHandle.Status.CANCELLING)
+            .withNamespace(jobEntity.namespace())
+            .withAuditInfo(
+                AuditInfo.builder()
+                    .withCreator(jobEntity.auditInfo().creator())
+                    .withCreateTime(jobEntity.auditInfo().createTime())
+                    
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withLastModifiedTime(Instant.now())
+                    .build())
+            .build();
+
+    try {
+      // Update the job entity in the entity store
+      entityStore.put(newJobEntity, true /* overwrite */);
+      return newJobEntity;
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Failed to update job entity %s to CANCELING status", 
newJobEntity), e);
+    }
+  }
+
   @Override
   public void close() throws IOException {
-    // TODO. Implement any necessary cleanup logic for the JobManager.
+    jobExecutor.close();
+    // TODO(jerry). Implement any necessary cleanup logic for the JobManager.
+  }
+
+  @VisibleForTesting
+  static JobTemplate createRuntimeJobTemplate(
+      JobTemplateEntity jobTemplateEntity, Map<String, String> jobConf, File 
stagingDir) {
+    String name = jobTemplateEntity.name();
+    String comment = jobTemplateEntity.comment();
+
+    JobTemplateEntity.TemplateContent content = 
jobTemplateEntity.templateContent();
+    String executable = fetchFileFromUri(content.executable(), stagingDir, 
TIMEOUT_IN_MS);
+
+    List<String> args =
+        content.arguments().stream()
+            .map(arg -> replacePlaceholder(arg, jobConf))
+            .collect(Collectors.toList());
+    Map<String, String> environments =
+        content.environments().entrySet().stream()
+            .collect(
+                Collectors.toMap(
+                    entry -> replacePlaceholder(entry.getKey(), jobConf),
+                    entry -> replacePlaceholder(entry.getValue(), jobConf)));
+    Map<String, String> customFields =
+        content.customFields().entrySet().stream()
+            .collect(
+                Collectors.toMap(
+                    entry -> replacePlaceholder(entry.getKey(), jobConf),
+                    entry -> replacePlaceholder(entry.getValue(), jobConf)));
+
+    // For shell job template
+    if (content.jobType() == JobTemplate.JobType.SHELL) {
+      List<String> scripts = fetchFilesFromUri(content.scripts(), stagingDir, 
TIMEOUT_IN_MS);
+
+      return new ShellJobTemplate.Builder()

Review Comment:
   Should we use `ShellJobTemplate.builder()` to align with the project's code 
style?



##########
core/src/main/java/org/apache/gravitino/job/JobManager.java:
##########
@@ -226,8 +267,243 @@ public JobEntity getJob(String metalake, String jobId) 
throws NoSuchJobException
         });
   }
 
+  @Override
+  public JobEntity runJob(String metalake, String jobTemplateName, Map<String, 
String> jobConf)
+      throws NoSuchJobTemplateException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    // Check if the job template exists, will throw NoSuchJobTemplateException 
if it does not exist.
+    JobTemplateEntity jobTemplateEntity = getJobTemplate(metalake, 
jobTemplateName);
+
+    // Create staging directory.
+    // TODO(jerry). The job staging directory will be deleted using a 
background thread.
+    long jobId = idGenerator.nextId();
+    File jobStagingDir = new File(stagingDir, JobHandle.JOB_ID_PREFIX + jobId);
+    if (!jobStagingDir.mkdirs()) {
+      throw new RuntimeException(
+          String.format("Failed to create staging directory %s for job %s", 
jobStagingDir, jobId));
+    }
+
+    // Create a JobTemplate by replacing the template parameters with the 
jobConf values, and
+    // also downloading any necessary files from the URIs specified in the job 
template.
+    JobTemplate jobTemplate = createRuntimeJobTemplate(jobTemplateEntity, 
jobConf, jobStagingDir);
+
+    // Submit the job template to the job executor
+    String jobExecutionId;
+    try {
+      jobExecutionId = jobExecutor.submitJob(jobTemplate);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Failed to submit job template %s for execution", 
jobTemplate), e);
+    }
+
+    // Create a new JobEntity to represent the job
+    JobEntity jobEntity =
+        JobEntity.builder()
+            .withId(jobId)
+            .withJobExecutionId(jobExecutionId)
+            .withJobTemplateName(jobTemplateName)
+            .withStatus(JobHandle.Status.QUEUED)
+            .withNamespace(NamespaceUtil.ofJob(metalake))
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+
+    try {
+      entityStore.put(jobEntity, false /* overwrite */);

Review Comment:
   It seems that the job and template relation operation is 
unnecessary?https://github.com/apache/gravitino/blob/f06649a359a8e3bda30f68c571b01cc6374db504/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java#L40-L41



##########
core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.connector.job.JobExecutor;
+
+public class JobExecutorFactory {
+
+  private static final String JOB_EXECUTOR_CONF_PREFIX = "gravitino.executor.";
+
+  private static final String JOB_EXECUTOR_CLASS_SUFFIX = ".class";
+
+  private JobExecutorFactory() {
+    // Private constructor to prevent instantiation
+  }
+
+  public static JobExecutor create(Config config) {
+    String jobExecutorName = config.get(Configs.JOB_EXECUTOR);
+    String jobExecutorClassKey =
+        JOB_EXECUTOR_CONF_PREFIX + jobExecutorName + JOB_EXECUTOR_CLASS_SUFFIX;
+    String clzName = config.getRawString(jobExecutorClassKey);

Review Comment:
   what's the class name for `local` executor?  You want to add it in another 
PR, or add an empty class in this PR?



##########
core/src/main/java/org/apache/gravitino/job/JobManager.java:
##########
@@ -226,8 +267,243 @@ public JobEntity getJob(String metalake, String jobId) 
throws NoSuchJobException
         });
   }
 
+  @Override
+  public JobEntity runJob(String metalake, String jobTemplateName, Map<String, 
String> jobConf)
+      throws NoSuchJobTemplateException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    // Check if the job template exists, will throw NoSuchJobTemplateException 
if it does not exist.
+    JobTemplateEntity jobTemplateEntity = getJobTemplate(metalake, 
jobTemplateName);
+
+    // Create staging directory.
+    // TODO(jerry). The job staging directory will be deleted using a 
background thread.
+    long jobId = idGenerator.nextId();
+    File jobStagingDir = new File(stagingDir, JobHandle.JOB_ID_PREFIX + jobId);
+    if (!jobStagingDir.mkdirs()) {
+      throw new RuntimeException(
+          String.format("Failed to create staging directory %s for job %s", 
jobStagingDir, jobId));
+    }
+
+    // Create a JobTemplate by replacing the template parameters with the 
jobConf values, and
+    // also downloading any necessary files from the URIs specified in the job 
template.
+    JobTemplate jobTemplate = createRuntimeJobTemplate(jobTemplateEntity, 
jobConf, jobStagingDir);
+
+    // Submit the job template to the job executor
+    String jobExecutionId;

Review Comment:
   What is the difference between `jobExecutionId` and `jobId`? If 
`jobExecutionId` is used for tracking, should it also be used as the 
`jobStagingDir` instead of using `jobId`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to