Author: cutting Date: Thu Jul 28 11:15:22 2005 New Revision: 225837 URL: http://svn.apache.org/viewcvs?rev=225837&view=rev Log: Improved progress reporting during reduce.
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskRunner.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java?rev=225837&r1=225836&r2=225837&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java Thu Jul 28 11:15:22 2005 @@ -112,6 +112,7 @@ // read the length-prefixed file content into a local file File file = getInputFile(mapTaskId, reduceTaskId); long length = in.readLong(); + float progPerByte = 1.0f / length; long unread = length; file.getParentFile().mkdirs(); // make directory OutputStream out = new FileOutputStream(file); @@ -123,7 +124,7 @@ out.write(buffer, 0, bytesToRead); unread -= bytesToRead; if (reporter != null) { - reporter.progress(length-unread/(float)length); + reporter.progress((length-unread)*progPerByte); } } } finally { Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=225837&r1=225836&r2=225837&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java Thu Jul 28 11:15:22 2005 @@ -158,6 +158,45 @@ copyPhase.complete(); // copy is already complete + // open a file to collect map output + File taskDir = new File(LOCAL_DIR, getTaskId()); + String file = new File(taskDir, "all.in").toString(); + SequenceFile.Writer writer = + new SequenceFile.Writer(lfs, file, keyClass, valueClass); + try { + // append all input files into a single input file + WritableComparable key = (WritableComparable)job.newInstance(keyClass); + Writable value = (Writable)job.newInstance(valueClass); + + for (int i = 0; i < mapTaskIds.length; i++) { + appendPhase.addPhase(); // one per file + } + + for (int i = 0; i < mapTaskIds.length; i++) { + File partFile = + MapOutputFile.getInputFile(mapTaskIds[i], getTaskId()); + float progPerByte = 1.0f / lfs.getLength(partFile); + Progress phase = appendPhase.phase(); + SequenceFile.Reader in = + new SequenceFile.Reader(lfs, partFile.toString()); + try { + while(in.next(key, value)) { + writer.append(key, value); + phase.set(in.getPosition()*progPerByte); + reportProgress(umbilical); + } + } finally { + in.close(); + } + phase.complete(); + } + + } finally { + writer.close(); + } + + appendPhase.complete(); // append is complete + // spawn a thread to give sort progress heartbeats Thread sortProgress = new Thread() { public void run() { @@ -175,44 +214,10 @@ }; sortProgress.setName("Sort progress reporter for task "+getTaskId()); - File taskDir = new File(LOCAL_DIR, getTaskId()); - String file = new File(taskDir, "all.in").toString(); String sortedFile = file+".sorted"; try { sortProgress.start(); - - // open a file to collect map output - SequenceFile.Writer writer = - new SequenceFile.Writer(lfs, file, keyClass, valueClass); - try { - // append all input files into a single input file - WritableComparable key = (WritableComparable)job.newInstance(keyClass); - Writable value = (Writable)job.newInstance(valueClass); - - for (int i = 0; i < mapTaskIds.length; i++) { - appendPhase.addPhase(); // one per file - } - - for (int i = 0; i < mapTaskIds.length; i++) { - String partFile = - MapOutputFile.getInputFile(mapTaskIds[i], getTaskId()).toString(); - SequenceFile.Reader in = new SequenceFile.Reader(lfs, partFile); - try { - while(in.next(key, value)) { - writer.append(key, value); - } - } finally { - in.close(); - } - appendPhase.startNextPhase(); - } - - } finally { - writer.close(); - } - - appendPhase.complete(); // append is complete // sort the input file WritableComparator comparator = Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskRunner.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskRunner.java?rev=225837&r1=225836&r2=225837&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskRunner.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskRunner.java Thu Jul 28 11:15:22 2005 @@ -90,6 +90,7 @@ // run java runChild(new String[] { jvm.toString(), + //"-Xrunhprof:cpu=samples,file="+t.getTaskId()+".prof", "-Xmx"+job.get("mapred.child.heap.size", "200m"), "-cp", classPath.toString(), TaskTracker.Child.class.getName(), // main is Child