Author: bobby Date: Thu Jun 28 19:59:19 2012 New Revision: 1355139 URL: http://svn.apache.org/viewvc?rev=1355139&view=rev Log: MAPREDUCE-4371. Check for cyclic dependencies in Jobcontrol job DAG (madhukara phatak via bobby)
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestJobControl.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1355139&r1=1355138&r2=1355139&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Jun 28 19:59:19 2012 @@ -123,6 +123,9 @@ Trunk (unreleased changes) MAPREDUCE-3868. Make Raid Compile. (Weiyan Wang via schen) + MAPREDUCE-4371. Check for cyclic dependencies in Jobcontrol job DAG + (madhukara phatak via bobby) + Branch-2 ( Unreleased changes ) INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java?rev=1355139&r1=1355138&r2=1355139&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java Thu Jun 28 19:59:19 2012 @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.HashMap; +import java.util.HashSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -215,6 +217,10 @@ public class JobControl implements Runna } synchronized(this) { + if (isCircular(jobsInProgress)) { + throw new IllegalArgumentException( + "job control has circular dependency"); + } Iterator<ControlledJob> it = jobsInProgress.iterator(); while(it.hasNext()) { ControlledJob j = it.next(); @@ -281,4 +287,64 @@ public class JobControl implements Runna } } } + + /** + * Uses topological sorting algorithm for finding circular dependency + */ + private boolean isCircular(final List<ControlledJob> jobList) { + boolean cyclePresent = false; + HashSet<ControlledJob> SourceSet = new HashSet<ControlledJob>(); + HashMap<ControlledJob, List<ControlledJob>> processedMap = + new HashMap<ControlledJob, List<ControlledJob>>(); + for (ControlledJob n : jobList) { + processedMap.put(n, new ArrayList<ControlledJob>()); + } + for (ControlledJob n : jobList) { + if (!hasInComingEdge(n, jobList, processedMap)) { + SourceSet.add(n); + } + } + while (!SourceSet.isEmpty()) { + ControlledJob controlledJob = SourceSet.iterator().next(); + SourceSet.remove(controlledJob); + if (controlledJob.getDependentJobs() != null) { + for (int i = 0; i < controlledJob.getDependentJobs().size(); i++) { + ControlledJob depenControlledJob = + controlledJob.getDependentJobs().get(i); + processedMap.get(controlledJob).add(depenControlledJob); + if (!hasInComingEdge(controlledJob, jobList, processedMap)) { + SourceSet.add(depenControlledJob); + } + } + } + } + + for (ControlledJob controlledJob : jobList) { + if (controlledJob.getDependentJobs() != null + && controlledJob.getDependentJobs().size() != processedMap.get( + controlledJob).size()) { + cyclePresent = true; + LOG.error("Job control has circular dependency for the job " + + controlledJob.getJobName()); + break; + } + } + return cyclePresent; + } + + private boolean hasInComingEdge(ControlledJob controlledJob, + List<ControlledJob> controlledJobList, + HashMap<ControlledJob, List<ControlledJob>> processedMap) { + boolean hasIncomingEdge = false; + for (ControlledJob k : controlledJobList) { + if (k != controlledJob && k.getDependentJobs() != null + && !processedMap.get(k).contains(controlledJob) + && k.getDependentJobs().contains(controlledJob)) { + hasIncomingEdge = true; + break; + } + } + return hasIncomingEdge; + + } } Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestJobControl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestJobControl.java?rev=1355139&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestJobControl.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestJobControl.java Thu Jun 28 19:59:19 2012 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.jobcontrol; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +public class TestJobControl { + + /* + * Tests for the circular dependency between the jobs in job control. If there + * is a circular dependency, all the jobs in job control will be failed. + */ + @Test + public void testCircularDependency() throws IOException { + ControlledJob job1 = new ControlledJob(new Configuration()); + job1.setJobName("job1"); + ControlledJob job2 = new ControlledJob(new Configuration()); + job2.setJobName("job2"); + ControlledJob job3 = new ControlledJob(new Configuration()); + job3.setJobName("job3"); + job1.addDependingJob(job2); + job2.addDependingJob(job3); + job3.addDependingJob(job1); + JobControl jobControl = new JobControl("test"); + jobControl.addJob(job1); + jobControl.addJob(job2); + jobControl.addJob(job3); + jobControl.run(); + + // assert that all jobs are failed due to cyclic dependency + List<ControlledJob> failedJobs = jobControl.getFailedJobList(); + assertTrue(failedJobs.contains(job1)); + assertTrue(failedJobs.contains(job2)); + assertTrue(failedJobs.contains(job3)); + + } +} +