A big +1 on this one :) On Wed, Oct 20, 2010 at 2:48 AM, <[email protected]> wrote: > Author: edwardyoon > Date: Wed Oct 20 01:48:36 2010 > New Revision: 1024485 > > URL: http://svn.apache.org/viewvc?rev=1024485&view=rev > Log: > Refactoring launchTask() method in GroomServer > > Added: > incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java > Modified: > incubator/hama/trunk/CHANGES.txt > incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java > incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java > incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java > incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java > incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java > incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java > incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java > incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java > incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java > > Modified: incubator/hama/trunk/CHANGES.txt > URL: > http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff > ============================================================================== > --- incubator/hama/trunk/CHANGES.txt (original) > +++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010 > @@ -50,6 +50,7 @@ Trunk (unreleased changes) > > IMPROVEMENTS > > + HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon) > HAMA-311: Add unit tests for IPC package (edwardyoon) > HAMA-312: Add serialize printing to ExampleDriver (edwardyoon) > HAMA-309: Add unit tests for Bytes utilities (edwardyoon) > > Modified: > incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java > URL: > http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff > ============================================================================== > --- > incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java > (original) > +++ > incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java > Wed Oct 20 01:48:36 2010 > @@ -54,7 +54,7 @@ public class PiEstimator { > } > } > > - byte[] tagName = Bytes.toBytes(getName().toString()); > + byte[] tagName = Bytes.toBytes(bspPeer.getHostName()); > byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations); > BSPMessage estimate = new BSPMessage(tagName, myData); > > > Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java > URL: > http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff > ============================================================================== > --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original) > +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 > 01:48:36 2010 > @@ -20,20 +20,5 @@ package org.apache.hama.bsp; > /** > * This class provides an abstract implementation of the BSP interface > */ > -public abstract class BSP extends Thread implements BSPInterface { > - private BSPPeer bspPeer; > - > - /** > - * A thread's run method. > - * > - * The run method performs the > - * {...@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)} > - */ > - public void runBSP() throws Exception { > - bsp(bspPeer); > - } > - > - public void setPeer(BSPPeer bspServer) { > - this.bspPeer = bspServer; > - } > +public abstract class BSP implements BSPInterface { > } > > Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java > URL: > http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff > ============================================================================== > --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java > (original) > +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct > 20 01:48:36 2010 > @@ -30,6 +30,12 @@ public class BSPMessage implements Writa > public BSPMessage() { > } > > + /** > + * Constructor > + * > + * @param tag of data > + * @param data of message > + */ > public BSPMessage(byte[] tag, byte[] data) { > this.tag = new byte[tag.length]; > this.data = new byte[data.length]; > @@ -37,11 +43,20 @@ public class BSPMessage implements Writa > System.arraycopy(data, 0, this.data, 0, data.length); > } > > + /** > + * BSP messages are typically identified with tags. This allows to get the > tag > + * of data. > + * > + * @return tag of data of BSP message > + */ > public byte[] getTag() { > byte[] result = this.tag; > return result; > } > > + /** > + * @return data of BSP message > + */ > public byte[] getData() { > byte[] result = this.data; > return result; > > Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java > URL: > http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff > ============================================================================== > --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original) > +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20 > 01:48:36 2010 > @@ -17,26 +17,21 @@ > */ > package org.apache.hama.bsp; > > -import org.apache.hadoop.conf.Configuration; > -import org.apache.hadoop.util.ReflectionUtils; > - > public class BSPTask extends Task { > - private BSP bsp; > - private Configuration conf; > - > - public BSPTask(BSPJobID jobId, String jobFile, String taskid, int > partition, Configuration conf) { > + > + public BSPTask() { > + } > + > + public BSPTask(BSPJobID jobId, String jobFile, String taskid, int > partition) { > this.jobId = jobId; > this.jobFile = jobFile; > this.taskId = taskid; > this.partition = partition; > - this.conf = conf; > } > > - public BSP getBSPClass() { > - bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class", > - BSP.class), conf); > - > - return bsp; > + �...@override > + public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) { > + return new BSPTaskRunner(this, bspPeer, conf); > } > > } > > Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java > URL: > http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto > ============================================================================== > --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java > (added) > +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed > Oct 20 01:48:36 2010 > @@ -0,0 +1,62 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.hama.bsp; > + > +import java.io.IOException; > + > +import org.apache.commons.logging.Log; > +import org.apache.commons.logging.LogFactory; > +import org.apache.hadoop.util.ReflectionUtils; > +import org.apache.zookeeper.KeeperException; > + > +public class BSPTaskRunner extends Thread { > + > + public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class); > + private Task task; > + private BSPJob conf; > + private BSPPeer bspPeer; > + > + public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) { > + this.task = bspTask; > + this.conf = conf; > + this.bspPeer = bspPeer; > + } > + > + public Task getTask() { > + return task; > + } > + > + public void run() { > + BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass( > + "bsp.work.class", BSP.class), conf.getConf()); > + > + try { > + bsp.bsp(bspPeer); > + } catch (IOException e) { > + // TODO Auto-generated catch block > + e.printStackTrace(); > + } catch (KeeperException e) { > + // TODO Auto-generated catch block > + e.printStackTrace(); > + } catch (InterruptedException e) { > + // TODO Auto-generated catch block > + e.printStackTrace(); > + } > + } > + > +} > > Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java > URL: > http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff > ============================================================================== > --- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java > (original) > +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed > Oct 20 01:48:36 2010 > @@ -23,9 +23,11 @@ import java.lang.reflect.Constructor; > import java.net.InetSocketAddress; > import java.util.ArrayList; > import java.util.HashMap; > +import java.util.HashSet; > import java.util.LinkedHashMap; > import java.util.List; > import java.util.Map; > +import java.util.Set; > import java.util.TreeMap; > import java.util.concurrent.BlockingQueue; > import java.util.concurrent.LinkedBlockingQueue; > @@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC; > import org.apache.hadoop.ipc.RemoteException; > import org.apache.hadoop.net.DNS; > import org.apache.hadoop.util.DiskChecker; > -import org.apache.hadoop.util.ReflectionUtils; > +import org.apache.hadoop.util.RunJar; > import org.apache.hadoop.util.StringUtils; > import org.apache.hadoop.util.DiskChecker.DiskErrorException; > import org.apache.hama.Constants; > @@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP > public class GroomServer implements Runnable { > public static final Log LOG = LogFactory.getLog(GroomServer.class); > private static BSPPeer bspPeer; > + static final String SUBDIR = "groomServer"; > > Configuration conf; > > @@ -281,9 +284,111 @@ public class GroomServer implements Runn > } > > try { > + localizeJob(tip); > + } catch (Throwable e) { > + String msg = ("Error initializing " + tip.getTask().getTaskID() + > ":\n" + StringUtils > + .stringifyException(e)); > + LOG.warn(msg); > + } > + } > + > + private void localizeJob(TaskInProgress tip) throws IOException { > + Task task = tip.getTask(); > + conf.addResource(task.getJobFile()); > + BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf); > + > + Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/" > + + task.getTaskID() + "/" + "job.xml"); > + > + RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip); > + BSPJob jobConf = null; > + > + synchronized (rjob) { > + if (!rjob.localized) { > + Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/" > + + task.getTaskID() + "/" + "job.jar"); > + systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile); > + Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar")); > + > + HamaConfiguration conf = new HamaConfiguration(); > + conf.addResource(localJobFile); > + jobConf = new BSPJob(conf, task.getJobID().toString()); > + jobConf.setJar(localJarFile.toString()); > + > + if (jarFile != null) { > + systemFS.copyToLocalFile(jarFile, localJarFile); > + > + // also unjar the job.jar files in workdir > + File workDir = new File( > + new File(localJobFile.toString()).getParent(), "work"); > + if (!workDir.mkdirs()) { > + if (!workDir.isDirectory()) { > + throw new IOException("Mkdirs failed to create " > + + workDir.toString()); > + } > + } > + RunJar.unJar(new File(localJarFile.toString()), workDir); > + } > + rjob.localized = true; > + } > + } > + launchTaskForJob(tip, jobConf); > + } > + > + private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) { > + try { > + tip.setJobConf(jobConf); > tip.launchTask(); > } catch (Throwable ie) { > - // TODO: when job failed. > + tip.taskStatus.setRunState(TaskStatus.State.FAILED); > + String error = StringUtils.stringifyException(ie); > + LOG.info(error); > + } > + } > + > + private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile, > + TaskInProgress tip) { > + synchronized (runningJobs) { > + RunningJob rJob = null; > + if (!runningJobs.containsKey(jobId)) { > + rJob = new RunningJob(jobId, localJobFile); > + rJob.localized = false; > + rJob.tasks = new HashSet<TaskInProgress>(); > + rJob.jobFile = localJobFile; > + runningJobs.put(jobId, rJob); > + } else { > + rJob = runningJobs.get(jobId); > + } > + rJob.tasks.add(tip); > + return rJob; > + } > + } > + > + /** > + * The datastructure for initializing a job > + */ > + static class RunningJob { > + private BSPJobID jobid; > + private Path jobFile; > + // keep this for later use > + Set<TaskInProgress> tasks; > + boolean localized; > + boolean keepJobFiles; > + > + RunningJob(BSPJobID jobid, Path jobFile) { > + this.jobid = jobid; > + localized = false; > + tasks = new HashSet<TaskInProgress>(); > + this.jobFile = jobFile; > + keepJobFiles = false; > + } > + > + Path getJobFile() { > + return jobFile; > + } > + > + BSPJobID getJobId() { > + return jobid; > } > } > > @@ -410,6 +515,8 @@ public class GroomServer implements Runn > // ///////////////////////////////////////////////////// > class TaskInProgress { > Task task; > + BSPJob jobConf; > + private BSPTaskRunner runner; > volatile boolean done = false; > volatile boolean wasKilled = false; > private TaskStatus taskStatus; > @@ -421,61 +528,29 @@ public class GroomServer implements Runn > TaskStatus.Phase.STARTING); > } > > - static final String SUBDIR = "groomServer"; > + public void setJobConf(BSPJob jobConf) { > + this.jobConf = jobConf; > + } > > - public void launchTask() { > + public void launchTask() throws IOException { > taskStatus.setRunState(TaskStatus.State.RUNNING); > + this.runner = task.createRunner(bspPeer, this.jobConf); > + this.runner.start(); > > - try { > - // TODO: need to move this code to TaskRunner > - > - task.getJobFile(); > - conf.addResource(task.getJobFile()); > - BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf); > - > - Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/" > - + task.getTaskID() + "/" + "job.xml"); > - Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/" > - + task.getTaskID() + "/" + "job.jar"); > - > - systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile); > - systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml", > - ".jar")), localJarFile); > - > - HamaConfiguration conf = new HamaConfiguration(); > - conf.addResource(localJobFile); > - BSPJob jobConf = new BSPJob(conf, task.getJobID().toString()); > - jobConf.setJar(localJarFile.toString()); > - > - BSP bsp = (BSP) ReflectionUtils > - .newInstance(jobConf.getBspClass(), conf); > - bsp.setPeer(bspPeer); > + // Check state of Task > + while (true) { > try { > - bsp.runBSP(); > - } catch (Exception e) { > + Thread.sleep(1000); > + } catch (InterruptedException e) { > e.printStackTrace(); > - taskStatus.setRunState(TaskStatus.State.FAILED); > } > > - } catch (IOException e) { > - // TODO Auto-generated catch block > - e.printStackTrace(); > - } finally { > - > - while (true) { > - try { > - Thread.sleep(1000); > - } catch (InterruptedException e) { > - e.printStackTrace(); > - } > - > - // If local/outgoing queues are empty, task is done. > - if (bspPeer.localQueue.size() == 0 > - && bspPeer.outgoingQueues.size() == 0) { > - taskStatus.setRunState(TaskStatus.State.SUCCEEDED); > - acceptNewTasks = true; > - break; > - } > + // If local/outgoing queues are empty, task is done. > + if (bspPeer.localQueue.size() == 0 > + && bspPeer.outgoingQueues.size() == 0) { > + taskStatus.setRunState(TaskStatus.State.SUCCEEDED); > + acceptNewTasks = true; > + break; > } > } > > > Modified: > incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java > URL: > http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff > ============================================================================== > --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java > (original) > +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java > Wed Oct 20 01:48:36 2010 > @@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ > } > > public void readFields(DataInput in) throws IOException { > - task = new Task(); > + task = new BSPTask(); > task.readFields(in); > } > > > Modified: > incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java > URL: > http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff > ============================================================================== > --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java > (original) > +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed > Oct 20 01:48:36 2010 > @@ -172,7 +172,7 @@ public class LocalJobRunner implements J > > try { > GroomServer servers = new GroomServer(conf); > - Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), > i, this.conf); > + Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), > i); > > // TODO not yet implemented > > > Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java > URL: > http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff > ============================================================================== > --- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original) > +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 > 01:48:36 2010 > @@ -23,14 +23,12 @@ import java.io.IOException; > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > +import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.LocalDirAllocator; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.io.Writable; > > -/** > - * > - */ > -public class Task implements Writable { > +public abstract class Task implements Writable { > public static final Log LOG = LogFactory.getLog(Task.class); > //////////////////////////////////////////// > // Fields > @@ -109,5 +107,7 @@ public class Task implements Writable { > taskId = Text.readString(in); > partition = in.readInt(); > } > + > + public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob > jobConf); > > } > > Modified: > incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java > URL: > http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff > ============================================================================== > --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java > (original) > +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed > Oct 20 01:48:36 2010 > @@ -101,7 +101,7 @@ class TaskInProgress { > return null; > } > > - t = new BSPTask(jobId, jobFile, taskid, partition, this.conf); > + t = new BSPTask(jobId, jobFile, taskid, partition); > activeTasks.put(taskid, status.getGroomName()); > > // Ask JobTracker to note that the task exists > > >
-- Filipe David Manana, [email protected], [email protected] "Reasonable men adapt themselves to the world. Unreasonable men adapt the world to themselves. That's why all progress depends on unreasonable men."
