Thanks. BTW, getAddress() and getHostName() methods are somewhat duplicated. If we remove one of the two, we have to change all code, related with it.
https://issues.apache.org/jira/browse/HAMA-316 Could you please comment here? On Wed, Oct 20, 2010 at 6:31 PM, Filipe David Manana <[email protected]> wrote: > A big +1 on this one :) > > On Wed, Oct 20, 2010 at 2:48 AM, <[email protected]> wrote: >> Author: edwardyoon >> Date: Wed Oct 20 01:48:36 2010 >> New Revision: 1024485 >> >> URL: http://svn.apache.org/viewvc?rev=1024485&view=rev >> Log: >> Refactoring launchTask() method in GroomServer >> >> Added: >> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java >> Modified: >> incubator/hama/trunk/CHANGES.txt >> >> incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java >> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java >> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java >> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java >> incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java >> incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java >> incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java >> incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java >> incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java >> >> Modified: incubator/hama/trunk/CHANGES.txt >> URL: >> http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff >> ============================================================================== >> --- incubator/hama/trunk/CHANGES.txt (original) >> +++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010 >> @@ -50,6 +50,7 @@ Trunk (unreleased changes) >> >> IMPROVEMENTS >> >> + HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon) >> HAMA-311: Add unit tests for IPC package (edwardyoon) >> HAMA-312: Add serialize printing to ExampleDriver (edwardyoon) >> HAMA-309: Add unit tests for Bytes utilities (edwardyoon) >> >> Modified: >> incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java >> URL: >> http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff >> ============================================================================== >> --- >> incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java >> (original) >> +++ >> incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java >> Wed Oct 20 01:48:36 2010 >> @@ -54,7 +54,7 @@ public class PiEstimator { >> } >> } >> >> - byte[] tagName = Bytes.toBytes(getName().toString()); >> + byte[] tagName = Bytes.toBytes(bspPeer.getHostName()); >> byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations); >> BSPMessage estimate = new BSPMessage(tagName, myData); >> >> >> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java >> URL: >> http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff >> ============================================================================== >> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original) >> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 >> 01:48:36 2010 >> @@ -20,20 +20,5 @@ package org.apache.hama.bsp; >> /** >> * This class provides an abstract implementation of the BSP interface >> */ >> -public abstract class BSP extends Thread implements BSPInterface { >> - private BSPPeer bspPeer; >> - >> - /** >> - * A thread's run method. >> - * >> - * The run method performs the >> - * {...@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)} >> - */ >> - public void runBSP() throws Exception { >> - bsp(bspPeer); >> - } >> - >> - public void setPeer(BSPPeer bspServer) { >> - this.bspPeer = bspServer; >> - } >> +public abstract class BSP implements BSPInterface { >> } >> >> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java >> URL: >> http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff >> ============================================================================== >> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java >> (original) >> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed >> Oct 20 01:48:36 2010 >> @@ -30,6 +30,12 @@ public class BSPMessage implements Writa >> public BSPMessage() { >> } >> >> + /** >> + * Constructor >> + * >> + * @param tag of data >> + * @param data of message >> + */ >> public BSPMessage(byte[] tag, byte[] data) { >> this.tag = new byte[tag.length]; >> this.data = new byte[data.length]; >> @@ -37,11 +43,20 @@ public class BSPMessage implements Writa >> System.arraycopy(data, 0, this.data, 0, data.length); >> } >> >> + /** >> + * BSP messages are typically identified with tags. This allows to get >> the tag >> + * of data. >> + * >> + * @return tag of data of BSP message >> + */ >> public byte[] getTag() { >> byte[] result = this.tag; >> return result; >> } >> >> + /** >> + * @return data of BSP message >> + */ >> public byte[] getData() { >> byte[] result = this.data; >> return result; >> >> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java >> URL: >> http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff >> ============================================================================== >> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original) >> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct >> 20 01:48:36 2010 >> @@ -17,26 +17,21 @@ >> */ >> package org.apache.hama.bsp; >> >> -import org.apache.hadoop.conf.Configuration; >> -import org.apache.hadoop.util.ReflectionUtils; >> - >> public class BSPTask extends Task { >> - private BSP bsp; >> - private Configuration conf; >> - >> - public BSPTask(BSPJobID jobId, String jobFile, String taskid, int >> partition, Configuration conf) { >> + >> + public BSPTask() { >> + } >> + >> + public BSPTask(BSPJobID jobId, String jobFile, String taskid, int >> partition) { >> this.jobId = jobId; >> this.jobFile = jobFile; >> this.taskId = taskid; >> this.partition = partition; >> - this.conf = conf; >> } >> >> - public BSP getBSPClass() { >> - bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class", >> - BSP.class), conf); >> - >> - return bsp; >> + �...@override >> + public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) { >> + return new BSPTaskRunner(this, bspPeer, conf); >> } >> >> } >> >> Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java >> URL: >> http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto >> ============================================================================== >> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java >> (added) >> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed >> Oct 20 01:48:36 2010 >> @@ -0,0 +1,62 @@ >> +/** >> + * Licensed to the Apache Software Foundation (ASF) under one >> + * or more contributor license agreements. See the NOTICE file >> + * distributed with this work for additional information >> + * regarding copyright ownership. The ASF licenses this file >> + * to you under the Apache License, Version 2.0 (the >> + * "License"); you may not use this file except in compliance >> + * with the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.hama.bsp; >> + >> +import java.io.IOException; >> + >> +import org.apache.commons.logging.Log; >> +import org.apache.commons.logging.LogFactory; >> +import org.apache.hadoop.util.ReflectionUtils; >> +import org.apache.zookeeper.KeeperException; >> + >> +public class BSPTaskRunner extends Thread { >> + >> + public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class); >> + private Task task; >> + private BSPJob conf; >> + private BSPPeer bspPeer; >> + >> + public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) { >> + this.task = bspTask; >> + this.conf = conf; >> + this.bspPeer = bspPeer; >> + } >> + >> + public Task getTask() { >> + return task; >> + } >> + >> + public void run() { >> + BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass( >> + "bsp.work.class", BSP.class), conf.getConf()); >> + >> + try { >> + bsp.bsp(bspPeer); >> + } catch (IOException e) { >> + // TODO Auto-generated catch block >> + e.printStackTrace(); >> + } catch (KeeperException e) { >> + // TODO Auto-generated catch block >> + e.printStackTrace(); >> + } catch (InterruptedException e) { >> + // TODO Auto-generated catch block >> + e.printStackTrace(); >> + } >> + } >> + >> +} >> >> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java >> URL: >> http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff >> ============================================================================== >> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java >> (original) >> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed >> Oct 20 01:48:36 2010 >> @@ -23,9 +23,11 @@ import java.lang.reflect.Constructor; >> import java.net.InetSocketAddress; >> import java.util.ArrayList; >> import java.util.HashMap; >> +import java.util.HashSet; >> import java.util.LinkedHashMap; >> import java.util.List; >> import java.util.Map; >> +import java.util.Set; >> import java.util.TreeMap; >> import java.util.concurrent.BlockingQueue; >> import java.util.concurrent.LinkedBlockingQueue; >> @@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC; >> import org.apache.hadoop.ipc.RemoteException; >> import org.apache.hadoop.net.DNS; >> import org.apache.hadoop.util.DiskChecker; >> -import org.apache.hadoop.util.ReflectionUtils; >> +import org.apache.hadoop.util.RunJar; >> import org.apache.hadoop.util.StringUtils; >> import org.apache.hadoop.util.DiskChecker.DiskErrorException; >> import org.apache.hama.Constants; >> @@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP >> public class GroomServer implements Runnable { >> public static final Log LOG = LogFactory.getLog(GroomServer.class); >> private static BSPPeer bspPeer; >> + static final String SUBDIR = "groomServer"; >> >> Configuration conf; >> >> @@ -281,9 +284,111 @@ public class GroomServer implements Runn >> } >> >> try { >> + localizeJob(tip); >> + } catch (Throwable e) { >> + String msg = ("Error initializing " + tip.getTask().getTaskID() + >> ":\n" + StringUtils >> + .stringifyException(e)); >> + LOG.warn(msg); >> + } >> + } >> + >> + private void localizeJob(TaskInProgress tip) throws IOException { >> + Task task = tip.getTask(); >> + conf.addResource(task.getJobFile()); >> + BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf); >> + >> + Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/" >> + + task.getTaskID() + "/" + "job.xml"); >> + >> + RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip); >> + BSPJob jobConf = null; >> + >> + synchronized (rjob) { >> + if (!rjob.localized) { >> + Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/" >> + + task.getTaskID() + "/" + "job.jar"); >> + systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile); >> + Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar")); >> + >> + HamaConfiguration conf = new HamaConfiguration(); >> + conf.addResource(localJobFile); >> + jobConf = new BSPJob(conf, task.getJobID().toString()); >> + jobConf.setJar(localJarFile.toString()); >> + >> + if (jarFile != null) { >> + systemFS.copyToLocalFile(jarFile, localJarFile); >> + >> + // also unjar the job.jar files in workdir >> + File workDir = new File( >> + new File(localJobFile.toString()).getParent(), "work"); >> + if (!workDir.mkdirs()) { >> + if (!workDir.isDirectory()) { >> + throw new IOException("Mkdirs failed to create " >> + + workDir.toString()); >> + } >> + } >> + RunJar.unJar(new File(localJarFile.toString()), workDir); >> + } >> + rjob.localized = true; >> + } >> + } >> + launchTaskForJob(tip, jobConf); >> + } >> + >> + private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) { >> + try { >> + tip.setJobConf(jobConf); >> tip.launchTask(); >> } catch (Throwable ie) { >> - // TODO: when job failed. >> + tip.taskStatus.setRunState(TaskStatus.State.FAILED); >> + String error = StringUtils.stringifyException(ie); >> + LOG.info(error); >> + } >> + } >> + >> + private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile, >> + TaskInProgress tip) { >> + synchronized (runningJobs) { >> + RunningJob rJob = null; >> + if (!runningJobs.containsKey(jobId)) { >> + rJob = new RunningJob(jobId, localJobFile); >> + rJob.localized = false; >> + rJob.tasks = new HashSet<TaskInProgress>(); >> + rJob.jobFile = localJobFile; >> + runningJobs.put(jobId, rJob); >> + } else { >> + rJob = runningJobs.get(jobId); >> + } >> + rJob.tasks.add(tip); >> + return rJob; >> + } >> + } >> + >> + /** >> + * The datastructure for initializing a job >> + */ >> + static class RunningJob { >> + private BSPJobID jobid; >> + private Path jobFile; >> + // keep this for later use >> + Set<TaskInProgress> tasks; >> + boolean localized; >> + boolean keepJobFiles; >> + >> + RunningJob(BSPJobID jobid, Path jobFile) { >> + this.jobid = jobid; >> + localized = false; >> + tasks = new HashSet<TaskInProgress>(); >> + this.jobFile = jobFile; >> + keepJobFiles = false; >> + } >> + >> + Path getJobFile() { >> + return jobFile; >> + } >> + >> + BSPJobID getJobId() { >> + return jobid; >> } >> } >> >> @@ -410,6 +515,8 @@ public class GroomServer implements Runn >> // ///////////////////////////////////////////////////// >> class TaskInProgress { >> Task task; >> + BSPJob jobConf; >> + private BSPTaskRunner runner; >> volatile boolean done = false; >> volatile boolean wasKilled = false; >> private TaskStatus taskStatus; >> @@ -421,61 +528,29 @@ public class GroomServer implements Runn >> TaskStatus.Phase.STARTING); >> } >> >> - static final String SUBDIR = "groomServer"; >> + public void setJobConf(BSPJob jobConf) { >> + this.jobConf = jobConf; >> + } >> >> - public void launchTask() { >> + public void launchTask() throws IOException { >> taskStatus.setRunState(TaskStatus.State.RUNNING); >> + this.runner = task.createRunner(bspPeer, this.jobConf); >> + this.runner.start(); >> >> - try { >> - // TODO: need to move this code to TaskRunner >> - >> - task.getJobFile(); >> - conf.addResource(task.getJobFile()); >> - BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf); >> - >> - Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/" >> - + task.getTaskID() + "/" + "job.xml"); >> - Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/" >> - + task.getTaskID() + "/" + "job.jar"); >> - >> - systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile); >> - systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml", >> - ".jar")), localJarFile); >> - >> - HamaConfiguration conf = new HamaConfiguration(); >> - conf.addResource(localJobFile); >> - BSPJob jobConf = new BSPJob(conf, task.getJobID().toString()); >> - jobConf.setJar(localJarFile.toString()); >> - >> - BSP bsp = (BSP) ReflectionUtils >> - .newInstance(jobConf.getBspClass(), conf); >> - bsp.setPeer(bspPeer); >> + // Check state of Task >> + while (true) { >> try { >> - bsp.runBSP(); >> - } catch (Exception e) { >> + Thread.sleep(1000); >> + } catch (InterruptedException e) { >> e.printStackTrace(); >> - taskStatus.setRunState(TaskStatus.State.FAILED); >> } >> >> - } catch (IOException e) { >> - // TODO Auto-generated catch block >> - e.printStackTrace(); >> - } finally { >> - >> - while (true) { >> - try { >> - Thread.sleep(1000); >> - } catch (InterruptedException e) { >> - e.printStackTrace(); >> - } >> - >> - // If local/outgoing queues are empty, task is done. >> - if (bspPeer.localQueue.size() == 0 >> - && bspPeer.outgoingQueues.size() == 0) { >> - taskStatus.setRunState(TaskStatus.State.SUCCEEDED); >> - acceptNewTasks = true; >> - break; >> - } >> + // If local/outgoing queues are empty, task is done. >> + if (bspPeer.localQueue.size() == 0 >> + && bspPeer.outgoingQueues.size() == 0) { >> + taskStatus.setRunState(TaskStatus.State.SUCCEEDED); >> + acceptNewTasks = true; >> + break; >> } >> } >> >> >> Modified: >> incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java >> URL: >> http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff >> ============================================================================== >> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java >> (original) >> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java >> Wed Oct 20 01:48:36 2010 >> @@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ >> } >> >> public void readFields(DataInput in) throws IOException { >> - task = new Task(); >> + task = new BSPTask(); >> task.readFields(in); >> } >> >> >> Modified: >> incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java >> URL: >> http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff >> ============================================================================== >> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java >> (original) >> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java >> Wed Oct 20 01:48:36 2010 >> @@ -172,7 +172,7 @@ public class LocalJobRunner implements J >> >> try { >> GroomServer servers = new GroomServer(conf); >> - Task task = new BSPTask(job.getJobID(), jobFile, >> tID.toString(), i, this.conf); >> + Task task = new BSPTask(job.getJobID(), jobFile, >> tID.toString(), i); >> >> // TODO not yet implemented >> >> >> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java >> URL: >> http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff >> ============================================================================== >> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original) >> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 >> 01:48:36 2010 >> @@ -23,14 +23,12 @@ import java.io.IOException; >> >> import org.apache.commons.logging.Log; >> import org.apache.commons.logging.LogFactory; >> +import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.fs.LocalDirAllocator; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.io.Writable; >> >> -/** >> - * >> - */ >> -public class Task implements Writable { >> +public abstract class Task implements Writable { >> public static final Log LOG = LogFactory.getLog(Task.class); >> //////////////////////////////////////////// >> // Fields >> @@ -109,5 +107,7 @@ public class Task implements Writable { >> taskId = Text.readString(in); >> partition = in.readInt(); >> } >> + >> + public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob >> jobConf); >> >> } >> >> Modified: >> incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java >> URL: >> http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff >> ============================================================================== >> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java >> (original) >> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java >> Wed Oct 20 01:48:36 2010 >> @@ -101,7 +101,7 @@ class TaskInProgress { >> return null; >> } >> >> - t = new BSPTask(jobId, jobFile, taskid, partition, this.conf); >> + t = new BSPTask(jobId, jobFile, taskid, partition); >> activeTasks.put(taskid, status.getGroomName()); >> >> // Ask JobTracker to note that the task exists >> >> >> > > > > -- > Filipe David Manana, > [email protected], [email protected] > > "Reasonable men adapt themselves to the world. > Unreasonable men adapt the world to themselves. > That's why all progress depends on unreasonable men." > -- Best Regards, Edward J. Yoon [email protected] http://blog.udanax.org
