kasakrisz closed pull request #9: AMBARI-24807 - Infra Manager: unable to restart job URL: https://github.com/apache/ambari-infra/pull/9
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java index 8729c4e2..f63ea581 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java @@ -63,12 +63,14 @@ public void schedule(String jobName, SchedulingProperties schedulingProperties) } private void restartIfFailed(JobExecution jobExecution) { - if (jobExecution.getExitStatus() == ExitStatus.FAILED) { - try { + try { + if (ExitStatus.FAILED.compareTo(jobExecution.getExitStatus()) == 0) { jobs.restart(jobExecution.getId()); - } catch (JobInstanceAlreadyCompleteException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | NoSuchJobExecutionException e) { - throw new RuntimeException(e); + } else if (ExitStatus.UNKNOWN.compareTo(jobExecution.getExitStatus()) == 0) { + jobs.abandon(jobExecution.getId()); } + } catch (JobInstanceAlreadyCompleteException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | NoSuchJobExecutionException e) { + throw new RuntimeException(e); } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java index f35387d4..ac8cd722 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java @@ -18,7 +18,19 @@ */ package org.apache.ambari.infra.manager; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; + +import javax.inject.Inject; +import javax.inject.Named; + import org.apache.ambari.infra.model.ExecutionContextResponse; import org.apache.ambari.infra.model.JobDetailsResponse; import org.apache.ambari.infra.model.JobExecutionDetailsResponse; @@ -50,17 +62,7 @@ import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; -import javax.inject.Inject; -import javax.inject.Named; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.TimeZone; +import com.google.common.collect.Lists; @Named public class JobManager implements Jobs { @@ -110,6 +112,11 @@ public void restart(Long jobExecutionId) return jobService.listJobExecutionsForJob(jobName, 0, 1).stream().findFirst(); } + @Override + public void abandon(Long jobExecution) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException { + jobService.abandon(jobExecution); + } + /** * Get all executions ids that mapped to specific job name, */ diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java index b2ca605b..5e435e87 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java @@ -18,6 +18,8 @@ */ package org.apache.ambari.infra.manager; +import java.util.Optional; + import org.apache.ambari.infra.model.JobExecutionInfoResponse; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; @@ -28,8 +30,6 @@ import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; -import java.util.Optional; - public interface Jobs { JobExecutionInfoResponse launchJob(String jobName, JobParameters params) throws JobParametersInvalidException, NoSuchJobException, @@ -39,4 +39,6 @@ void restart(Long jobExecutionId) JobParametersInvalidException, JobRestartException, NoSuchJobExecutionException; Optional<JobExecution> lastRun(String jobName) throws NoSuchJobException, NoSuchJobExecutionException; + + void abandon(Long jobExecution) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException; } diff --git a/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra-manager/src/main/resources/infra-manager.properties index d7bdc29b..50c43e34 100644 --- a/ambari-infra-manager/src/main/resources/infra-manager.properties +++ b/ambari-infra-manager/src/main/resources/infra-manager.properties @@ -83,6 +83,6 @@ infra-manager.jobs.solr_data_deleting.delete_audit_logs.enabled=true infra-manager.jobs.solr_data_deleting.delete_audit_logs.zoo_keeper_connection_string=zookeeper:2181 infra-manager.jobs.solr_data_deleting.delete_audit_logs.collection=audit_logs infra-manager.jobs.solr_data_deleting.delete_audit_logs.filter_field=evtTime -infra-manager.jobs.clean-up.ttl=PT24H +infra-manager.jobs.clean-up.ttl=PT240H infra-manager.jobs.clean-up.scheduling.enabled=true infra-manager.jobs.clean-up.scheduling.cron=0 * * * * ? diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java index ba1150f7..68807409 100644 --- a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java +++ b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java @@ -1,5 +1,15 @@ package org.apache.ambari.infra.job; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.isA; + +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; + +import javax.batch.operations.NoSuchJobException; + import org.apache.ambari.infra.manager.Jobs; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -14,15 +24,6 @@ import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.support.CronTrigger; -import javax.batch.operations.NoSuchJobException; -import java.util.Optional; -import java.util.concurrent.ScheduledFuture; - -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.isA; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -111,4 +112,19 @@ public void testScheduleWhenPreviousExecutionFailedJobIsRestartedAndScheduled() jobScheduler.schedule(jobName, schedulingProperties); } + + @Test + public void testScheduleWhenPreviousExecutionIsUnknownJobIsAbandonedAndScheduled() throws Exception { + String jobName = "job0"; + SchedulingProperties schedulingProperties = new SchedulingProperties(); + schedulingProperties.setCron("* * * * * ?"); + JobExecution jobExecution = new JobExecution(1L, new JobParameters()); + jobExecution.setExitStatus(ExitStatus.UNKNOWN); + expect(jobs.lastRun(jobName)).andReturn(Optional.of(jobExecution)); + jobs.abandon(1L); expectLastCall(); + expect(taskScheduler.schedule(isA(Runnable.class), eq(new CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture); + replayAll(); + + jobScheduler.schedule(jobName, schedulingProperties); + } } \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services