Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
 Wed May 26 18:59:02 2010
@@ -22,20 +22,23 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.cli2.Option;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
@@ -115,7 +118,7 @@ public final class ItemSimilarityJob ext
     
"org.apache.mahout.cf.taste.hadoop.similarity.item.ItemSimilarityJob.numberOfUsers";
 
   @Override
-  public int run(String[] args) throws IOException {
+  public int run(String[] args) throws IOException, ClassNotFoundException, 
InterruptedException {
 
     Option similarityClassOpt = AbstractJob.buildOption("similarityClassname", 
"s",
     "Name of distributed similarity class to instantiate");
@@ -129,77 +132,79 @@ public final class ItemSimilarityJob ext
 
     String distributedSimilarityClassname = 
parsedArgs.get("--similarityClassname");
 
-    String inputPath = originalConf.get("mapred.input.dir");
-    String outputPath = originalConf.get("mapred.output.dir");
-    String tempDirPath = parsedArgs.get("--tempDir");
-
-    String countUsersPath = tempDirPath + "/countUsers";
-    String itemVectorsPath = tempDirPath + "/itemVectors";
-    String userVectorsPath = tempDirPath + "/userVectors";
-
-    /* count all unique users */
-    JobConf countUsers = prepareJobConf(inputPath,
-                                         countUsersPath,
-                                         TextInputFormat.class,
-                                         CountUsersMapper.class,
-                                         CountUsersKeyWritable.class,
-                                         VarLongWritable.class,
-                                         CountUsersReducer.class,
-                                         VarIntWritable.class,
-                                         NullWritable.class,
-                                         TextOutputFormat.class);
-
-    countUsers.setPartitionerClass(
-        CountUsersKeyWritable.CountUsersPartitioner.class);
-    countUsers.setOutputValueGroupingComparator(
-        CountUsersKeyWritable.CountUsersGroupComparator.class);
-
-    JobClient.runJob(countUsers);
-
-    int numberOfUsers =
-        readNumberOfUsers(countUsers, (countUsersPath + "/part-00000"));
-
-    JobConf itemVectors = prepareJobConf(inputPath,
-                                         itemVectorsPath,
-                                         TextInputFormat.class,
-                                         ToUserPrefsMapper.class,
-                                         VarLongWritable.class,
-                                         EntityPrefWritable.class,
-                                         ToItemVectorReducer.class,
-                                         VarLongWritable.class,
-                                         EntityPrefWritableArrayWritable.class,
-                                         SequenceFileOutputFormat.class);
-    JobClient.runJob(itemVectors);
-
-    JobConf userVectors = prepareJobConf(itemVectorsPath,
-                                         userVectorsPath,
-                                         SequenceFileInputFormat.class,
-                                         PreferredItemsPerUserMapper.class,
-                                         VarLongWritable.class,
-                                         
ItemPrefWithItemVectorWeightWritable.class,
-                                         PreferredItemsPerUserReducer.class,
-                                         VarLongWritable.class,
-                                         
ItemPrefWithItemVectorWeightArrayWritable.class,
-                                         SequenceFileOutputFormat.class);
-
-    userVectors.set(DISTRIBUTED_SIMILARITY_CLASSNAME, 
distributedSimilarityClassname);
-    JobClient.runJob(userVectors);
-
-    JobConf similarity = prepareJobConf(userVectorsPath,
-                                        outputPath,
-                                        SequenceFileInputFormat.class,
-                                        CopreferredItemsMapper.class,
-                                        ItemPairWritable.class,
-                                        CoRating.class,
-                                        SimilarityReducer.class,
-                                        EntityEntityWritable.class,
-                                        DoubleWritable.class,
-                                        TextOutputFormat.class);
+    Path inputPath = new Path(originalConf.get("mapred.input.dir"));
+    Path outputPath = new Path(originalConf.get("mapred.output.dir"));
+    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
+
+    Path countUsersPath = new Path(tempDirPath, "countUsers");
+    Path itemVectorsPath = new Path(tempDirPath, "itemVectors");
+    Path userVectorsPath = new Path(tempDirPath, "userVectors");
+
+    AtomicInteger currentPhase = new AtomicInteger();
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      /* count all unique users */
+      Job countUsers = prepareJob(inputPath,
+                                  countUsersPath,
+                                  TextInputFormat.class,
+                                  CountUsersMapper.class,
+                                  CountUsersKeyWritable.class,
+                                  VarLongWritable.class,
+                                  CountUsersReducer.class,
+                                  VarIntWritable.class,
+                                  NullWritable.class,
+                                  TextOutputFormat.class);
+      
countUsers.setPartitionerClass(CountUsersKeyWritable.CountUsersPartitioner.class);
+      
countUsers.setGroupingComparatorClass(CountUsersKeyWritable.CountUsersGroupComparator.class);
+      countUsers.waitForCompletion(true);
+    }
 
-    similarity.set(DISTRIBUTED_SIMILARITY_CLASSNAME, 
distributedSimilarityClassname);
-    similarity.setInt(NUMBER_OF_USERS, numberOfUsers);
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job itemVectors = prepareJob(inputPath,
+                                   itemVectorsPath,
+                                   TextInputFormat.class,
+                                   ToUserPrefsMapper.class,
+                                   VarLongWritable.class,
+                                   EntityPrefWritable.class,
+                                   ToItemVectorReducer.class,
+                                   VarLongWritable.class,
+                                   EntityPrefWritableArrayWritable.class,
+                                   SequenceFileOutputFormat.class);
+      itemVectors.waitForCompletion(true);
+    }
 
-    JobClient.runJob(similarity);
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job userVectors = prepareJob(itemVectorsPath,
+                                   userVectorsPath,
+                                   SequenceFileInputFormat.class,
+                                   PreferredItemsPerUserMapper.class,
+                                   VarLongWritable.class,
+                                   ItemPrefWithItemVectorWeightWritable.class,
+                                   PreferredItemsPerUserReducer.class,
+                                   VarLongWritable.class,
+                                   
ItemPrefWithItemVectorWeightArrayWritable.class,
+                                   SequenceFileOutputFormat.class);
+      userVectors.getConfiguration().set(DISTRIBUTED_SIMILARITY_CLASSNAME, 
distributedSimilarityClassname);
+      userVectors.waitForCompletion(true);
+    }
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job similarity = prepareJob(userVectorsPath,
+                                  outputPath,
+                                  SequenceFileInputFormat.class,
+                                  CopreferredItemsMapper.class,
+                                  ItemPairWritable.class,
+                                  CoRating.class,
+                                  SimilarityReducer.class,
+                                  EntityEntityWritable.class,
+                                  DoubleWritable.class,
+                                  TextOutputFormat.class);
+      Configuration conf = similarity.getConfiguration();
+      int numberOfUsers = readNumberOfUsers(conf, countUsersPath);
+      conf.set(DISTRIBUTED_SIMILARITY_CLASSNAME, 
distributedSimilarityClassname);
+      conf.setInt(NUMBER_OF_USERS, numberOfUsers);
+      similarity.waitForCompletion(true);
+    }
 
     return 0;
   }
@@ -208,11 +213,17 @@ public final class ItemSimilarityJob ext
     ToolRunner.run(new ItemSimilarityJob(), args);
   }
 
-  static int readNumberOfUsers(JobConf conf, String outputFile) throws 
IOException {
+  static int readNumberOfUsers(Configuration conf, Path outputDir) throws 
IOException {
     FileSystem fs = FileSystem.get(conf);
+    Path outputFile = fs.listStatus(outputDir, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("part-");
+      }
+    })[0].getPath();
     InputStream in = null;
     try  {
-      in = fs.open(new Path(outputFile));
+      in = fs.open(outputFile);
       ByteArrayOutputStream out = new ByteArrayOutputStream();
       IOUtils.copyBytes(in, out, conf);
       return Integer.parseInt(new String(out.toByteArray(), 
Charset.forName("UTF-8")).trim());

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java
 Wed May 26 18:59:02 2010
@@ -21,28 +21,26 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritableArrayWritable;
 import org.apache.mahout.cf.taste.hadoop.similarity.DistributedItemSimilarity;
+import org.apache.mahout.common.iterator.IteratorIterable;
 import org.apache.mahout.math.VarLongWritable;
 
 /**
  * for each item-vector, we compute its weight here and map out all entries 
with the user as key,
  * so we can create the user-vectors in the reducer
  */
-public final class PreferredItemsPerUserMapper extends MapReduceBase
-    implements 
Mapper<VarLongWritable,EntityPrefWritableArrayWritable,VarLongWritable,ItemPrefWithItemVectorWeightWritable>
 {
+public final class PreferredItemsPerUserMapper extends
+    
Mapper<VarLongWritable,EntityPrefWritableArrayWritable,VarLongWritable,ItemPrefWithItemVectorWeightWritable>
 {
 
   private DistributedItemSimilarity distributedSimilarity;
 
   @Override
-  public void configure(JobConf jobConf) {
-    super.configure(jobConf);
+  public void setup(Context context) {
+    Configuration jobConf = context.getConfiguration();
     distributedSimilarity =
       
ItemSimilarityJob.instantiateSimilarity(jobConf.get(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME));
   }
@@ -50,15 +48,15 @@ public final class PreferredItemsPerUser
   @Override
   public void map(VarLongWritable item,
                   EntityPrefWritableArrayWritable userPrefsArray,
-                  
OutputCollector<VarLongWritable,ItemPrefWithItemVectorWeightWritable> output,
-                  Reporter reporter) throws IOException {
+                  Context context) throws IOException, InterruptedException {
 
     EntityPrefWritable[] userPrefs = userPrefsArray.getPrefs();
 
-    double weight = distributedSimilarity.weightOfItemVector(new 
UserPrefsIterator(userPrefs));
+    double weight = distributedSimilarity.weightOfItemVector(
+        new IteratorIterable<Float>(new UserPrefsIterator(userPrefs)));
 
     for (EntityPrefWritable userPref : userPrefs) {
-      output.collect(new VarLongWritable(userPref.getID()),
+      context.write(new VarLongWritable(userPref.getID()),
           new ItemPrefWithItemVectorWeightWritable(item.get(), weight, 
userPref.getPrefValue()));
     }
   }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java
 Wed May 26 18:59:02 2010
@@ -19,33 +19,28 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.math.VarLongWritable;
 
-public final class PreferredItemsPerUserReducer extends MapReduceBase
-    implements 
Reducer<VarLongWritable,ItemPrefWithItemVectorWeightWritable,VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable>
 {
+public final class PreferredItemsPerUserReducer extends
+    
Reducer<VarLongWritable,ItemPrefWithItemVectorWeightWritable,VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable>
 {
 
   @Override
   public void reduce(VarLongWritable user,
-                     Iterator<ItemPrefWithItemVectorWeightWritable> itemPrefs,
-                     
OutputCollector<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable> 
output,
-                     Reporter reporter)
-      throws IOException {
+                     Iterable<ItemPrefWithItemVectorWeightWritable> itemPrefs,
+                     Context context)
+      throws IOException, InterruptedException {
 
     Set<ItemPrefWithItemVectorWeightWritable> itemPrefsWithItemVectorWeight
         = new HashSet<ItemPrefWithItemVectorWeightWritable>();
 
-    while (itemPrefs.hasNext()) {
-      itemPrefsWithItemVectorWeight.add(itemPrefs.next().clone());
+    for (ItemPrefWithItemVectorWeightWritable writable : itemPrefs) {
+      itemPrefsWithItemVectorWeight.add(writable.clone());
     }
 
-    output.collect(user, new ItemPrefWithItemVectorWeightArrayWritable(
+    context.write(user, new ItemPrefWithItemVectorWeightArrayWritable(
         itemPrefsWithItemVectorWeight.toArray(
         new 
ItemPrefWithItemVectorWeightWritable[itemPrefsWithItemVectorWeight.size()])));
   }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java
 Wed May 26 18:59:02 2010
@@ -18,14 +18,10 @@
 package org.apache.mahout.cf.taste.hadoop.similarity.item;
 
 import java.io.IOException;
-import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
 import org.apache.mahout.cf.taste.hadoop.similarity.CoRating;
 import org.apache.mahout.cf.taste.hadoop.similarity.DistributedItemSimilarity;
@@ -34,15 +30,15 @@ import org.apache.mahout.cf.taste.hadoop
  * Finally compute the similarity for each item-pair, that has been corated at 
least once.
  * Computation is done with an external implementation of {...@link 
DistributedItemSimilarity}.
  */
-public final class SimilarityReducer extends MapReduceBase
-    implements 
Reducer<ItemPairWritable,CoRating,EntityEntityWritable,DoubleWritable> {
+public final class SimilarityReducer extends
+    Reducer<ItemPairWritable,CoRating,EntityEntityWritable,DoubleWritable> {
 
   private DistributedItemSimilarity distributedItemSimilarity;
   private int numberOfUsers;
 
   @Override
-  public void configure(JobConf jobConf) {
-    super.configure(jobConf);
+  public void setup(Context context) {
+    Configuration jobConf = context.getConfiguration();
     distributedItemSimilarity =
       
ItemSimilarityJob.instantiateSimilarity(jobConf.get(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME));
     numberOfUsers = jobConf.getInt(ItemSimilarityJob.NUMBER_OF_USERS, -1);
@@ -53,16 +49,15 @@ public final class SimilarityReducer ext
 
   @Override
   public void reduce(ItemPairWritable pair,
-                     Iterator<CoRating> coRatings,
-                     OutputCollector<EntityEntityWritable,DoubleWritable> 
output,
-                     Reporter reporter)
-      throws IOException {
+                     Iterable<CoRating> coRatings,
+                     Context context)
+      throws IOException, InterruptedException {
 
     double similarity =
       distributedItemSimilarity.similarity(coRatings, pair.getItemAWeight(), 
pair.getItemBWeight(), numberOfUsers);
 
     if (!Double.isNaN(similarity)) {
-      output.collect(pair.getItemItemWritable(), new 
DoubleWritable(similarity));
+      context.write(pair.getItemItemWritable(), new 
DoubleWritable(similarity));
     }
   }
 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java
 Wed May 26 18:59:02 2010
@@ -19,13 +19,9 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritableArrayWritable;
 import org.apache.mahout.math.VarLongWritable;
@@ -34,24 +30,22 @@ import org.apache.mahout.math.VarLongWri
  * For each single item, collect all users with their preferences
  * (thereby building the item vectors of the user-item-matrix)
  */
-public final class ToItemVectorReducer
-    extends MapReduceBase implements
+public final class ToItemVectorReducer extends
     
Reducer<VarLongWritable,EntityPrefWritable,VarLongWritable,EntityPrefWritableArrayWritable>
 {
 
   @Override
   public void reduce(VarLongWritable item,
-                     Iterator<EntityPrefWritable> userPrefs,
-                     
OutputCollector<VarLongWritable,EntityPrefWritableArrayWritable> output,
-                     Reporter reporter)
-      throws IOException {
+                     Iterable<EntityPrefWritable> userPrefs,
+                     Context context)
+      throws IOException, InterruptedException {
 
     Set<EntityPrefWritable> collectedUserPrefs = new 
HashSet<EntityPrefWritable>();
 
-    while (userPrefs.hasNext()) {
-      collectedUserPrefs.add(userPrefs.next().clone());
+    for (EntityPrefWritable writable : userPrefs) {
+      collectedUserPrefs.add(writable.clone());
     }
 
-    output.collect(item, new EntityPrefWritableArrayWritable(
+    context.write(item, new EntityPrefWritableArrayWritable(
         collectedUserPrefs.toArray(new 
EntityPrefWritable[collectedUserPrefs.size()])));
   }
 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java
 Wed May 26 18:59:02 2010
@@ -19,18 +19,19 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
 import org.apache.mahout.common.AbstractJob;
@@ -41,7 +42,7 @@ import org.apache.mahout.math.VarLongWri
 public final class SlopeOneAverageDiffsJob extends AbstractJob {
   
   @Override
-  public int run(String[] args) throws IOException {
+  public int run(String[] args) throws IOException, ClassNotFoundException, 
InterruptedException {
     
     Map<String,String> parsedArgs = AbstractJob.parseArguments(args);
     if (parsedArgs == null) {
@@ -49,23 +50,41 @@ public final class SlopeOneAverageDiffsJ
     }
     
     Configuration originalConf = getConf();
-    String prefsFile = originalConf.get("mapred.input.dir");
-    String outputPath = originalConf.get("mapred.output.dir");
-    String averagesOutputPath = parsedArgs.get("--tempDir");
-    
-    JobConf prefsToDiffsJobConf = prepareJobConf(prefsFile, averagesOutputPath,
-      TextInputFormat.class, ToItemPrefsMapper.class, VarLongWritable.class, 
EntityPrefWritable.class,
-      SlopeOnePrefsToDiffsReducer.class, EntityEntityWritable.class, 
FloatWritable.class,
-      SequenceFileOutputFormat.class);
-    JobClient.runJob(prefsToDiffsJobConf);
-    
-    JobConf diffsToAveragesJobConf = prepareJobConf(averagesOutputPath, 
outputPath,
-      SequenceFileInputFormat.class, IdentityMapper.class, 
EntityEntityWritable.class, FloatWritable.class,
-      SlopeOneDiffsToAveragesReducer.class, EntityEntityWritable.class, 
FloatWritable.class,
-      TextOutputFormat.class);
-    diffsToAveragesJobConf.setClass("mapred.output.compression.codec", 
GzipCodec.class,
-      CompressionCodec.class);
-    JobClient.runJob(diffsToAveragesJobConf);
+    Path prefsFile = new Path(originalConf.get("mapred.input.dir"));
+    Path outputPath = new Path(originalConf.get("mapred.output.dir"));
+    Path averagesOutputPath = new Path(parsedArgs.get("--tempDir"));
+
+    AtomicInteger currentPhase = new AtomicInteger();
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job prefsToDiffsJob = prepareJob(prefsFile,
+                                       averagesOutputPath,
+                                       TextInputFormat.class,
+                                       ToItemPrefsMapper.class,
+                                       VarLongWritable.class,
+                                       EntityPrefWritable.class,
+                                       SlopeOnePrefsToDiffsReducer.class,
+                                       EntityEntityWritable.class,
+                                       FloatWritable.class,
+                                       SequenceFileOutputFormat.class);
+      prefsToDiffsJob.waitForCompletion(true);
+    }
+
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job diffsToAveragesJob = prepareJob(averagesOutputPath,
+                                          outputPath,
+                                          SequenceFileInputFormat.class,
+                                          Mapper.class,
+                                          EntityEntityWritable.class,
+                                          FloatWritable.class,
+                                          SlopeOneDiffsToAveragesReducer.class,
+                                          EntityEntityWritable.class,
+                                          FloatWritable.class,
+                                          TextOutputFormat.class);
+      FileOutputFormat.setOutputCompressorClass(diffsToAveragesJob, 
GzipCodec.class);
+      diffsToAveragesJob.waitForCompletion(true);
+    }
     return 0;
   }
   

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneDiffsToAveragesReducer.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneDiffsToAveragesReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneDiffsToAveragesReducer.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneDiffsToAveragesReducer.java
 Wed May 26 18:59:02 2010
@@ -18,29 +18,24 @@
 package org.apache.mahout.cf.taste.hadoop.slopeone;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
 
-public final class SlopeOneDiffsToAveragesReducer extends MapReduceBase 
implements
+public final class SlopeOneDiffsToAveragesReducer extends
     Reducer<EntityEntityWritable,FloatWritable, 
EntityEntityWritable,FloatWritable> {
   
   @Override
   public void reduce(EntityEntityWritable key,
-                     Iterator<FloatWritable> values,
-                     OutputCollector<EntityEntityWritable,FloatWritable> 
output,
-                     Reporter reporter) throws IOException {
+                     Iterable<FloatWritable> values,
+                     Context context) throws IOException, InterruptedException 
{
     int count = 0;
     double total = 0.0;
-    while (values.hasNext()) {
-      total += values.next().get();
+    for (FloatWritable value : values) {
+      total += value.get();
       count++;
     }
-    output.collect(key, new FloatWritable((float) (total / count)));
+    context.write(key, new FloatWritable((float) (total / count)));
   }
 }
\ No newline at end of file

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java
 Wed May 26 18:59:02 2010
@@ -20,29 +20,24 @@ package org.apache.mahout.cf.taste.hadoo
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.math.VarLongWritable;
 
-public final class SlopeOnePrefsToDiffsReducer extends MapReduceBase implements
+public final class SlopeOnePrefsToDiffsReducer extends
     
Reducer<VarLongWritable,EntityPrefWritable,EntityEntityWritable,FloatWritable> {
   
   @Override
   public void reduce(VarLongWritable key,
-                     Iterator<EntityPrefWritable> values,
-                     OutputCollector<EntityEntityWritable,FloatWritable> 
output,
-                     Reporter reporter) throws IOException {
+                     Iterable<EntityPrefWritable> values,
+                     Context context) throws IOException, InterruptedException 
{
     List<EntityPrefWritable> prefs = new ArrayList<EntityPrefWritable>();
-    while (values.hasNext()) {
-      prefs.add(new EntityPrefWritable(values.next()));
+    for (EntityPrefWritable writable : values) {
+      prefs.add(new EntityPrefWritable(writable));
     }
     Collections.sort(prefs, ByItemIDComparator.getInstance());
     int size = prefs.size();
@@ -54,7 +49,7 @@ public final class SlopeOnePrefsToDiffsR
         EntityPrefWritable second = prefs.get(j);
         long itemBID = second.getID();
         float itemBValue = second.getPrefValue();
-        output.collect(new EntityEntityWritable(itemAID, itemBID), new 
FloatWritable(itemBValue - itemAValue));
+        context.write(new EntityEntityWritable(itemAID, itemBID), new 
FloatWritable(itemBValue - itemAValue));
       }
     }
   }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java 
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java 
Wed May 26 18:59:02 2010
@@ -31,16 +31,15 @@ import org.apache.commons.cli2.builder.A
 import org.apache.commons.cli2.builder.DefaultOptionBuilder;
 import org.apache.commons.cli2.builder.GroupBuilder;
 import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.util.Tool;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.slf4j.Logger;
@@ -166,47 +165,54 @@ public abstract class AbstractJob extend
     return !phaseSkipped;
   }
   
-  protected JobConf prepareJobConf(String inputPath,
-                                   String outputPath,
-                                   Class<? extends InputFormat> inputFormat,
-                                   Class<? extends Mapper> mapper,
-                                   Class<? extends Writable> mapperKey,
-                                   Class<? extends Writable> mapperValue,
-                                   Class<? extends Reducer> reducer,
-                                   Class<? extends Writable> reducerKey,
-                                   Class<? extends Writable> reducerValue,
-                                   Class<? extends OutputFormat> outputFormat) 
throws IOException {
-    
-    JobConf jobConf = new JobConf(getConf(), getClass());
-    FileSystem fs = FileSystem.get(jobConf);
-    
-    Path inputPathPath = new Path(inputPath).makeQualified(fs);
-    Path outputPathPath = new Path(outputPath).makeQualified(fs);
-    
-    jobConf.setClass("mapred.input.format.class", inputFormat, 
InputFormat.class);
-    // Override this:
-    jobConf.set("mapred.input.dir", 
StringUtils.escapeString(inputPathPath.toString()));
-    
-    jobConf.setClass("mapred.mapper.class", mapper, Mapper.class);
-    jobConf.setClass("mapred.mapoutput.key.class", mapperKey, Writable.class);
-    jobConf.setClass("mapred.mapoutput.value.class", mapperValue, 
Writable.class);
-    
-    jobConf.setClass("mapred.reducer.class", reducer, Reducer.class);
-    jobConf.setClass("mapred.output.key.class", reducerKey, Writable.class);
-    jobConf.setClass("mapred.output.value.class", reducerValue, 
Writable.class);
+  protected Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
+                           Class<? extends Reducer> reducer,
+                           Class<? extends Writable> reducerKey,
+                           Class<? extends Writable> reducerValue,
+                           Class<? extends OutputFormat> outputFormat) throws 
IOException {
+    
+    Job job = new Job(new Configuration(getConf()));
+    Configuration jobConf = job.getConfiguration();
+
+    if (reducer.equals(Reducer.class)) {
+      if (mapper.equals(Mapper.class)) {
+        throw new IllegalStateException("Can't figure out the user class jar 
file from mapper/reducer");
+      }
+      job.setJarByClass(mapper);
+    } else {
+      job.setJarByClass(reducer);
+    }
+
+    job.setInputFormatClass(inputFormat);
+    jobConf.set("mapred.input.dir", inputPath.toString());
+
+    job.setMapperClass(mapper);
+    job.setMapOutputKeyClass(mapperKey);
+    job.setMapOutputValueClass(mapperValue);
+
     jobConf.setBoolean("mapred.compress.map.output", true);
 
-    String customJobName = jobConf.get("mapred.job.name");
-    if (customJobName == null) {
+    job.setReducerClass(reducer);
+    job.setOutputKeyClass(reducerKey);
+    job.setOutputValueClass(reducerValue);
+
+    String customJobName = job.getJobName();
+    if (customJobName == null || customJobName.trim().length() == 0) {
       customJobName = getClass().getSimpleName();
     }
-    jobConf.set("mapred.job.name", customJobName + '-' + 
mapper.getSimpleName() + '-' + reducer.getSimpleName());
+    customJobName += '-' + mapper.getSimpleName();
+    customJobName += '-' + reducer.getSimpleName();
+    job.setJobName(customJobName);
 
-    jobConf.setClass("mapred.output.format.class", outputFormat, 
OutputFormat.class);
-    // Override this:    
-    jobConf.set("mapred.output.dir", 
StringUtils.escapeString(outputPathPath.toString()));
+    job.setOutputFormatClass(outputFormat);
+    jobConf.set("mapred.output.dir", outputPath.toString());
     
-    return jobConf;
+    return job;
   }
   
 }

Modified: 
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarityTestCase.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarityTestCase.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- 
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarityTestCase.java
 (original)
+++ 
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarityTestCase.java
 Wed May 26 18:59:02 2010
@@ -57,8 +57,8 @@ public abstract class DistributedItemSim
       }
     }
 
-    double weightX = similarity.weightOfItemVector(nonNaNPrefsX.iterator());
-    double weightY = similarity.weightOfItemVector(nonNaNPrefsY.iterator());
+    double weightX = similarity.weightOfItemVector(nonNaNPrefsX);
+    double weightY = similarity.weightOfItemVector(nonNaNPrefsY);
 
     List<CoRating> coRatings = new LinkedList<CoRating>();
 
@@ -71,7 +71,7 @@ public abstract class DistributedItemSim
       }
     }
 
-    double result = similarity.similarity(coRatings.iterator(), weightX, 
weightY, numberOfUsers);
+    double result = similarity.similarity(coRatings, weightX, weightY, 
numberOfUsers);
     assertEquals(expectedSimilarity, result, EPSILON);
   }
 

Modified: 
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- 
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java
 (original)
+++ 
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java
 Wed May 26 18:59:02 2010
@@ -22,6 +22,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileReader;
 import java.io.FileWriter;
+import java.io.FilenameFilter;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -29,12 +30,13 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritableArrayWritable;
@@ -50,32 +52,32 @@ import org.easymock.classextension.EasyM
 /**
  * Unit tests for the mappers and reducers in 
org.apache.mahout.cf.taste.hadoop.similarity
  * Integration test with a mini-file at the end
- *
  */
 public final class ItemSimilarityTest extends MahoutTestCase {
 
   public void testUserPrefsPerItemMapper() throws Exception {
-    OutputCollector<VarLongWritable,VarLongWritable> output =
-        EasyMock.createMock(OutputCollector.class);
-    output.collect(new VarLongWritable(34L), new EntityPrefWritable(12L, 
2.3f));
-    EasyMock.replay(output);
+    Mapper<LongWritable,Text,VarLongWritable,VarLongWritable>.Context context =
+        EasyMock.createMock(Mapper.Context.class);
+    context.write(new VarLongWritable(34L), new EntityPrefWritable(12L, 2.3f));
+    EasyMock.replay(context);
 
-    new ToUserPrefsMapper().map(new LongWritable(), new Text("12,34,2.3"), 
output, null);
+    new ToUserPrefsMapper().map(new LongWritable(), new Text("12,34,2.3"), 
context);
 
-    EasyMock.verify(output);
+    EasyMock.verify(context);
   }
 
   public void testCountUsersMapper() throws Exception {
-    OutputCollector<CountUsersKeyWritable,VarLongWritable> output = 
EasyMock.createMock(OutputCollector.class);
-    output.collect(keyForUserID(12L), EasyMock.eq(new VarLongWritable(12L)));
-    output.collect(keyForUserID(35L), EasyMock.eq(new VarLongWritable(35L)));
-    EasyMock.replay(output);
+    Mapper<LongWritable,Text,CountUsersKeyWritable,VarLongWritable>.Context 
context =
+        EasyMock.createMock(Mapper.Context.class);
+    context.write(keyForUserID(12L), EasyMock.eq(new VarLongWritable(12L)));
+    context.write(keyForUserID(35L), EasyMock.eq(new VarLongWritable(35L)));
+    EasyMock.replay(context);
 
     CountUsersMapper mapper = new CountUsersMapper();
-    mapper.map(null, new Text("12,100,1.3"), output, null);
-    mapper.map(null, new Text("35,100,3.0"), output, null);
+    mapper.map(null, new Text("12,100,1.3"), context);
+    mapper.map(null, new Text("35,100,3.0"), context);
 
-    EasyMock.verify(output);
+    EasyMock.verify(context);
   }
 
   static CountUsersKeyWritable keyForUserID(final long userID) {
@@ -98,17 +100,18 @@ public final class ItemSimilarityTest ex
 
   public void testCountUsersReducer() throws Exception {
 
-    OutputCollector<VarIntWritable,NullWritable> output = 
EasyMock.createMock(OutputCollector.class);
-    output.collect(new VarIntWritable(3), NullWritable.get());
-    EasyMock.replay(output);
+    
Reducer<CountUsersKeyWritable,VarLongWritable,VarIntWritable,NullWritable>.Context
 context =
+        EasyMock.createMock(Reducer.Context.class);
+    context.write(new VarIntWritable(3), NullWritable.get());
+    EasyMock.replay(context);
 
     List<VarLongWritable> userIDs = Arrays.asList(new VarLongWritable(1L), new 
VarLongWritable(1L),
-                                                new VarLongWritable(3L), new 
VarLongWritable(5L),
-                                                new VarLongWritable(5L), new 
VarLongWritable(5L));
+                                                  new VarLongWritable(3L), new 
VarLongWritable(5L),
+                                                  new VarLongWritable(5L), new 
VarLongWritable(5L));
 
-    new CountUsersReducer().reduce(null, userIDs.iterator(), output, null);
+    new CountUsersReducer().reduce(null, userIDs, context);
 
-    EasyMock.verify(output);
+    EasyMock.verify(context);
   }
 
   public void testToItemVectorReducer() throws Exception {
@@ -116,16 +119,16 @@ public final class ItemSimilarityTest ex
     List<EntityPrefWritable> userPrefs = Arrays.asList(
         new EntityPrefWritable(34L, 1.0f), new EntityPrefWritable(56L, 2.0f));
 
-    OutputCollector<VarLongWritable,EntityPrefWritableArrayWritable> output =
-        EasyMock.createMock(OutputCollector.class);
+    
Reducer<VarLongWritable,EntityPrefWritable,VarLongWritable,EntityPrefWritableArrayWritable>.Context
 context =
+        EasyMock.createMock(Reducer.Context.class);
 
-    output.collect(EasyMock.eq(new VarLongWritable(12L)), 
equalToUserPrefs(userPrefs));
+    context.write(EasyMock.eq(new VarLongWritable(12L)), 
equalToUserPrefs(userPrefs));
 
-    EasyMock.replay(output);
+    EasyMock.replay(context);
 
-    new ToItemVectorReducer().reduce(new VarLongWritable(12L), 
userPrefs.iterator(), output, null);
+    new ToItemVectorReducer().reduce(new VarLongWritable(12L), userPrefs, 
context);
 
-    EasyMock.verify(output);
+    EasyMock.verify(context);
   }
 
 
@@ -162,30 +165,32 @@ public final class ItemSimilarityTest ex
   }
 
   public void testPreferredItemsPerUserMapper() throws Exception {
-    OutputCollector<VarLongWritable,ItemPrefWithItemVectorWeightWritable> 
output =
-        EasyMock.createMock(OutputCollector.class);
+    
Mapper<VarLongWritable,EntityPrefWritableArrayWritable,VarLongWritable,ItemPrefWithItemVectorWeightWritable>.Context
 context =
+        EasyMock.createMock(Mapper.Context.class);
     EntityPrefWritableArrayWritable userPrefs = new 
EntityPrefWritableArrayWritable(
         new EntityPrefWritable[] {
             new EntityPrefWritable(12L, 2.0f),
             new EntityPrefWritable(56L, 3.0f) });
 
+    Configuration conf = new Configuration();
+    EasyMock.expect(context.getConfiguration()).andStubReturn(conf);
+
     double weight =
-      new 
DistributedUncenteredZeroAssumingCosineSimilarity().weightOfItemVector(Arrays.asList(2.0f,
 3.0f).iterator());
+      new 
DistributedUncenteredZeroAssumingCosineSimilarity().weightOfItemVector(Arrays.asList(2.0f,
 3.0f));
 
-    output.collect(new VarLongWritable(12L), new 
ItemPrefWithItemVectorWeightWritable(34L, weight, 2.0f));
-    output.collect(new VarLongWritable(56L), new 
ItemPrefWithItemVectorWeightWritable(34L, weight, 3.0f));
+    context.write(new VarLongWritable(12L), new 
ItemPrefWithItemVectorWeightWritable(34L, weight, 2.0f));
+    context.write(new VarLongWritable(56L), new 
ItemPrefWithItemVectorWeightWritable(34L, weight, 3.0f));
 
-    JobConf conf = new JobConf();
     conf.set(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME,
         
"org.apache.mahout.cf.taste.hadoop.similarity.DistributedUncenteredZeroAssumingCosineSimilarity");
 
-    EasyMock.replay(output);
+    EasyMock.replay(context);
 
     PreferredItemsPerUserMapper mapper = new PreferredItemsPerUserMapper();
-    mapper.configure(conf);
-    mapper.map(new VarLongWritable(34L), userPrefs, output, null);
+    mapper.setup(context);
+    mapper.map(new VarLongWritable(34L), userPrefs, context);
 
-    EasyMock.verify(output);
+    EasyMock.verify(context);
   }
 
   public void testPreferredItemsPerUserReducer() throws Exception {
@@ -194,17 +199,16 @@ public final class ItemSimilarityTest ex
         Arrays.asList(new ItemPrefWithItemVectorWeightWritable(34L, 5.0, 1.0f),
                       new ItemPrefWithItemVectorWeightWritable(56L, 7.0, 
2.0f));
 
-    OutputCollector<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable> 
output =
-        EasyMock.createMock(OutputCollector.class);
+    
Reducer<VarLongWritable,ItemPrefWithItemVectorWeightWritable,VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable>.Context
 context =
+        EasyMock.createMock(Reducer.Context.class);
 
-    output.collect(EasyMock.eq(new VarLongWritable(12L)), 
equalToItemPrefs(itemPrefs));
+    context.write(EasyMock.eq(new VarLongWritable(12L)), 
equalToItemPrefs(itemPrefs));
 
-    EasyMock.replay(output);
+    EasyMock.replay(context);
 
-    new PreferredItemsPerUserReducer().reduce(
-        new VarLongWritable(12L), itemPrefs.iterator(), output, null);
+    new PreferredItemsPerUserReducer().reduce(new VarLongWritable(12L), 
itemPrefs, context);
 
-    EasyMock.verify(output);
+    EasyMock.verify(context);
   }
 
   static ItemPrefWithItemVectorWeightArrayWritable equalToItemPrefs(
@@ -239,8 +243,8 @@ public final class ItemSimilarityTest ex
   }
 
   public void testCopreferredItemsMapper() throws Exception {
-    OutputCollector<ItemPairWritable, CoRating> output =
-        EasyMock.createMock(OutputCollector.class);
+    
Mapper<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable,ItemPairWritable,CoRating>.Context
 context =
+        EasyMock.createMock(Mapper.Context.class);
     ItemPrefWithItemVectorWeightArrayWritable itemPrefs =
         EasyMock.createMock(ItemPrefWithItemVectorWeightArrayWritable.class);
 
@@ -248,36 +252,37 @@ public final class ItemSimilarityTest ex
         new ItemPrefWithItemVectorWeightWritable(34L, 2.0, 1.0f), new 
ItemPrefWithItemVectorWeightWritable(56L, 3.0, 2.0f),
         new ItemPrefWithItemVectorWeightWritable(78L, 4.0, 3.0f) });
 
-    output.collect(new ItemPairWritable(34L, 56L, 2.0, 3.0), new 
CoRating(1.0f, 2.0f));
-    output.collect(new ItemPairWritable(34L, 78L, 2.0, 4.0), new 
CoRating(1.0f, 3.0f));
-    output.collect(new ItemPairWritable(56L, 78L, 3.0, 4.0), new 
CoRating(2.0f, 3.0f));
+    context.write(new ItemPairWritable(34L, 56L, 2.0, 3.0), new CoRating(1.0f, 
2.0f));
+    context.write(new ItemPairWritable(34L, 78L, 2.0, 4.0), new CoRating(1.0f, 
3.0f));
+    context.write(new ItemPairWritable(56L, 78L, 3.0, 4.0), new CoRating(2.0f, 
3.0f));
 
-    EasyMock.replay(output, itemPrefs);
+    EasyMock.replay(context, itemPrefs);
 
-    new CopreferredItemsMapper().map(new VarLongWritable(), itemPrefs, output, 
null);
+    new CopreferredItemsMapper().map(new VarLongWritable(), itemPrefs, 
context);
 
-    EasyMock.verify(output, itemPrefs);
+    EasyMock.verify(context, itemPrefs);
   }
 
   public void testSimilarityReducer() throws Exception {
-    OutputCollector<EntityEntityWritable,DoubleWritable> output =
-        EasyMock.createMock(OutputCollector.class);
+    
Reducer<ItemPairWritable,CoRating,EntityEntityWritable,DoubleWritable>.Context 
context =
+        EasyMock.createMock(Reducer.Context.class);
+    Configuration conf = new Configuration();
+    EasyMock.expect(context.getConfiguration()).andStubReturn(conf);
 
-    JobConf conf = new JobConf();
     conf.set(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME,
         
"org.apache.mahout.cf.taste.hadoop.similarity.DistributedUncenteredZeroAssumingCosineSimilarity");
     conf.setInt(ItemSimilarityJob.NUMBER_OF_USERS, 1);
 
-    output.collect(new EntityEntityWritable(12L, 34L), new 
DoubleWritable(0.5));
+    context.write(new EntityEntityWritable(12L, 34L), new DoubleWritable(0.5));
 
-    EasyMock.replay(output);
+    EasyMock.replay(context);
 
     SimilarityReducer reducer = new SimilarityReducer();
-    reducer.configure(conf);
-    reducer.reduce(new ItemPairWritable(12L, 34L, 2.0, 10.0), 
Arrays.asList(new CoRating(2.5f, 2.0f),
-            new CoRating(2.0f, 2.5f)).iterator(), output, null);
+    reducer.setup(context);
+    reducer.reduce(new ItemPairWritable(12L, 34L, 2.0, 10.0),
+                   Arrays.asList(new CoRating(2.5f, 2.0f),new CoRating(2.0f, 
2.5f)), context);
 
-    EasyMock.verify(output);
+    EasyMock.verify(context);
   }
 
   public void testCompleteJob() throws Exception {
@@ -311,19 +316,25 @@ public final class ItemSimilarityTest ex
     Configuration conf = new Configuration();
     conf.set("mapred.input.dir", inputFile.getAbsolutePath());
     conf.set("mapred.output.dir", outputDir.getAbsolutePath());
-    conf.set("mapred.output.compress", Boolean.FALSE.toString());
+    conf.setBoolean("mapred.output.compress", false);
 
     similarityJob.setConf(conf);
 
     similarityJob.run(new String[] { "--tempDir", tmpDir.getAbsolutePath(), 
"--similarityClassname",
         
"org.apache.mahout.cf.taste.hadoop.similarity.DistributedUncenteredZeroAssumingCosineSimilarity"});
 
-    File countUsersPart = new File(new File(tmpDir, "countUsers"), 
"part-00000");
-    int numberOfUsers = ItemSimilarityJob.readNumberOfUsers(new JobConf(), 
countUsersPart.getAbsolutePath());
+    File countUsersPart = new File(tmpDir, "countUsers");
+    int numberOfUsers = ItemSimilarityJob.readNumberOfUsers(new 
Configuration(),
+                                                            new 
Path(countUsersPart.getAbsolutePath()));
 
     assertEquals(3, numberOfUsers);
 
-    File outPart = new File(outputDir, "part-00000");
+    File outPart = outputDir.listFiles(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        return name.startsWith("part-");
+      }
+    })[0];
     BufferedReader reader = new BufferedReader(new FileReader(outPart));
 
     String line;

Modified: 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
 (original)
+++ 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
 Wed May 26 18:59:02 2010
@@ -18,16 +18,13 @@
 package org.apache.mahout.text;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.common.AbstractJob;
 
@@ -38,36 +35,32 @@ public class TextParagraphSplittingJob e
   @Override
   public int run(String[] strings) throws Exception {
     Configuration originalConf = getConf();
-    JobConf conf = prepareJobConf(originalConf.get("mapred.input.dir"),
-                                  originalConf.get("mapred.output.dir"),
-                                  SequenceFileInputFormat.class,
-                                  SplitMap.class,
-                                  Text.class,
-                                  Text.class,
-                                  Reducer.class,
-                                  Text.class,
-                                  Text.class,
-                                  SequenceFileOutputFormat.class);
-    conf.setNumReduceTasks(0);
-
-    JobClient.runJob(conf).waitForCompletion();
+    Job job = prepareJob(new Path(originalConf.get("mapred.input.dir")),
+                         new Path(originalConf.get("mapred.output.dir")),
+                         SequenceFileInputFormat.class,
+                         SplitMap.class,
+                         Text.class,
+                         Text.class,
+                         Reducer.class,
+                         Text.class,
+                         Text.class,
+                         SequenceFileOutputFormat.class);
+    job.setNumReduceTasks(0);
+    job.waitForCompletion(true);
     return 1;
   }
 
-  public static class SplitMap extends MapReduceBase implements 
Mapper<Text,Text,Text,Text> {
+  public static class SplitMap extends Mapper<Text,Text,Text,Text> {
 
     @Override
-    public void map(Text key,
-                    Text text,
-                    OutputCollector<Text, Text> out,
-                    Reporter reporter) throws IOException {
+    public void map(Text key, Text text, Context context) throws IOException, 
InterruptedException {
       Text outText = new Text();
       int loc = 0;
       while(loc >= 0 && loc < text.getLength()) {
         int nextLoc = text.find("\n\n", loc+1);
-        if(nextLoc > 0) {
+        if (nextLoc > 0) {
           outText.set(text.getBytes(), loc, (nextLoc - loc));
-          out.collect(key, outText);
+          context.write(key, outText);
         }
         loc = nextLoc;
       }


Reply via email to