Author: cutting Date: Thu Jul 28 10:11:15 2005 New Revision: 225827 URL: http://svn.apache.org/viewcvs?rev=225827&view=rev Log: Add task state to progress reports, ignored for now. Also make task progress reporting a bit more thread friendly.
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java?rev=225827&r1=225826&r2=225827&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java Thu Jul 28 10:11:15 2005 @@ -101,13 +101,13 @@ public Task getTask(String taskid) { return null; } - public void progress(String taskId, FloatWritable progress) { + public void progress(String taskId, float progress, String state) { float taskIndex = mapIds.indexOf(taskId); if (taskIndex >= 0) { // mapping float numTasks = mapIds.size(); - status.mapProgress = (taskIndex/numTasks)+(progress.get()/numTasks); + status.mapProgress = (taskIndex/numTasks)+(progress/numTasks); } else { - status.reduceProgress = progress.get(); + status.reduceProgress = progress; } } Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java?rev=225827&r1=225826&r2=225827&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java Thu Jul 28 10:11:15 2005 @@ -95,8 +95,10 @@ throws IOException { long now = System.currentTimeMillis(); if (now > nextProgressTime) { - umbilical.progress(getTaskId(), new FloatWritable(taskProgress.get())); - nextProgressTime = now + PROGRESS_INTERVAL; + synchronized (this) { + nextProgressTime = now + PROGRESS_INTERVAL; + umbilical.progress(getTaskId(), taskProgress.get(), ""); + } } } Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=225827&r1=225826&r2=225827&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Thu Jul 28 10:11:15 2005 @@ -467,9 +467,9 @@ /** * Called periodically to report Task progress, from 0.0 to 1.0. */ - public void progress(String taskid, FloatWritable progress) throws IOException { + public void progress(String taskid, float progress, String state) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); - tip.reportProgress(progress.get()); + tip.reportProgress(progress); } /** Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java?rev=225827&r1=225826&r2=225827&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java Thu Jul 28 10:11:15 2005 @@ -30,11 +30,15 @@ Task getTask(String taskid) throws IOException; /** Report child's progress to parent. + * @param taskid the id of the task * @param progress value between zero and one + * @param state description of task's current state */ - void progress(String taskid, FloatWritable progress) throws IOException; + void progress(String taskid, float progress, String state) + throws IOException; - /** Report a child diagnostic message back to parent + /** Report error messages back to parent. Calls should be sparing, since all + * such messages are held in the job tracker. * @param trace, the stack trace text */ void reportDiagnosticInfo(String taskid, String trace) throws IOException;