Author: ssc
Date: Wed Mar  6 12:22:01 2013
New Revision: 1453310

URL: http://svn.apache.org/r1453310
Log:
MAHOUT-1151 Object reuse in distributed ALS

Modified:
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/FixedSizePriorityQueue.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/GenericRecommendedItem.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/recommender/RecommendedItem.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/FixedSizePriorityQueue.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/FixedSizePriorityQueue.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/FixedSizePriorityQueue.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/FixedSizePriorityQueue.java
 Wed Mar  6 12:22:01 2013
@@ -71,7 +71,7 @@ abstract class FixedSizePriorityQueue<T>
     return topItems;
   }
 
-  protected T peek() {
+  public T peek() {
     return queue.peek();
   }
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
 Wed Mar  6 12:22:01 2013
@@ -201,6 +201,12 @@ public class ParallelALSFactorizationJob
   }
 
   static class ItemRatingVectorsMapper extends 
Mapper<LongWritable,Text,IntWritable,VectorWritable> {
+
+    private IntWritable itemIDWritable = new IntWritable();
+    private VectorWritable ratingsWritable = new VectorWritable(true);
+
+    private Vector ratings = new 
SequentialAccessSparseVector(Integer.MAX_VALUE, 1);
+
     @Override
     protected void map(LongWritable offset, Text line, Context ctx) throws 
IOException, InterruptedException {
       String[] tokens = TasteHadoopUtils.splitPrefTokens(line.toString());
@@ -208,10 +214,15 @@ public class ParallelALSFactorizationJob
       int itemID = Integer.parseInt(tokens[1]);
       float rating = Float.parseFloat(tokens[2]);
 
-      Vector ratings = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
-      ratings.set(userID, rating);
+      ratings.setQuick(userID, rating);
 
-      ctx.write(new IntWritable(itemID), new VectorWritable(ratings, true));
+      itemIDWritable.set(itemID);
+      ratingsWritable.set(ratings);
+
+      ctx.write(itemIDWritable, ratingsWritable);
+
+      // prepare instance for reuse
+      ratings.setQuick(userID, 0.0d);
     }
   }
 
@@ -240,6 +251,8 @@ public class ParallelALSFactorizationJob
     private int numFeatures;
     private OpenIntObjectHashMap<Vector> UorM;
 
+    private VectorWritable uiOrmjWritable = new VectorWritable();
+
     @Override
     protected void setup(Mapper.Context ctx) throws IOException, 
InterruptedException {
       lambda = Double.parseDouble(ctx.getConfiguration().get(LAMBDA));
@@ -254,7 +267,8 @@ public class ParallelALSFactorizationJob
     @Override
     protected void map(IntWritable userOrItemID, VectorWritable 
ratingsWritable, Context ctx)
         throws IOException, InterruptedException {
-      Vector ratings = new SequentialAccessSparseVector(ratingsWritable.get());
+      Vector ratings = ratingsWritable.get();
+
       List<Vector> featureVectors = Lists.newArrayList();
       Iterator<Vector.Element> interactions = ratings.iterateNonZero();
       while (interactions.hasNext()) {
@@ -264,7 +278,8 @@ public class ParallelALSFactorizationJob
 
       Vector uiOrmj = AlternatingLeastSquaresSolver.solve(featureVectors, 
ratings, lambda, numFeatures);
 
-      ctx.write(userOrItemID, new VectorWritable(uiOrmj));
+      uiOrmjWritable.set(uiOrmj);
+      ctx.write(userOrItemID, uiOrmjWritable);
     }
   }
 
@@ -272,6 +287,8 @@ public class ParallelALSFactorizationJob
 
     private ImplicitFeedbackAlternatingLeastSquaresSolver solver;
 
+    private VectorWritable uiOrmjWritable = new VectorWritable();
+
     @Override
     protected void setup(Mapper.Context ctx) throws IOException, 
InterruptedException {
       double lambda = Double.parseDouble(ctx.getConfiguration().get(LAMBDA));
@@ -289,15 +306,20 @@ public class ParallelALSFactorizationJob
     @Override
     protected void map(IntWritable userOrItemID, VectorWritable 
ratingsWritable, Context ctx)
         throws IOException, InterruptedException {
-      Vector ratings = new SequentialAccessSparseVector(ratingsWritable.get());
 
-      Vector uiOrmj = solver.solve(ratings);
+      Vector uiOrmj = solver.solve(ratingsWritable.get());
 
-      ctx.write(userOrItemID, new VectorWritable(uiOrmj));
+      uiOrmjWritable.set(uiOrmj);
+      ctx.write(userOrItemID, uiOrmjWritable);
     }
   }
 
   static class AverageRatingMapper extends 
Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+    private IntWritable firstIndex = new IntWritable(0);
+    private Vector featureVector = new 
RandomAccessSparseVector(Integer.MAX_VALUE, 1);
+    private VectorWritable featureVectorWritable = new VectorWritable();
+
     @Override
     protected void map(IntWritable r, VectorWritable v, Context ctx) throws 
IOException, InterruptedException {
       RunningAverage avg = new FullRunningAverage();
@@ -305,9 +327,13 @@ public class ParallelALSFactorizationJob
       while (elements.hasNext()) {
         avg.addDatum(elements.next().get());
       }
-      Vector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
-      vector.setQuick(r.get(), avg.getAverage());
-      ctx.write(new IntWritable(0), new VectorWritable(vector));
+
+      featureVector.setQuick(r.get(), avg.getAverage());
+      featureVectorWritable.set(featureVector);
+      ctx.write(firstIndex, featureVectorWritable);
+
+      // prepare instance for reuse
+      featureVector.setQuick(r.get(), 0.0d);
     }
   }
 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
 Wed Mar  6 12:22:01 2013
@@ -118,6 +118,8 @@ public class RecommenderJob extends Abst
     private int recommendationsPerUser;
     private float maxRating;
 
+    private RecommendedItemsWritable result = new RecommendedItemsWritable();
+
     @Override
     protected void setup(Context ctx) throws IOException, InterruptedException 
{
       recommendationsPerUser = 
ctx.getConfiguration().getInt(NUM_RECOMMENDATIONS,
@@ -151,19 +153,25 @@ public class RecommenderJob extends Abst
         public boolean apply(int itemID, Vector itemFeatures) {
           if (!alreadyRatedItems.contains(itemID)) {
             double predictedRating = U.get(userID).dot(itemFeatures);
-            topKItems.offer(new GenericRecommendedItem(itemID, (float) 
predictedRating));
+
+            // manual check to avoid an object instantiation per unknown item
+            if (topKItems.size() < recommendationsPerUser || (float) 
predictedRating > topKItems.peek().getValue()) {
+              topKItems.offer(new GenericRecommendedItem(itemID, (float) 
predictedRating));
+            }
           }
           return true;
         }
       });
 
-      List<RecommendedItem> recommendedItems = 
Lists.newArrayListWithExpectedSize(recommendationsPerUser);
-      for (RecommendedItem topItem : topKItems.retrieve()) {
-        recommendedItems.add(new GenericRecommendedItem(topItem.getItemID(), 
Math.min(topItem.getValue(), maxRating)));
-      }
-
       if (!topKItems.isEmpty()) {
-        ctx.write(userIDWritable, new 
RecommendedItemsWritable(recommendedItems));
+
+        List<RecommendedItem> recommendedItems = topKItems.retrieve();
+        for (RecommendedItem topItem : topKItems.retrieve()) {
+          topItem.capToMaxValue(maxRating);
+        }
+
+        result.set(recommendedItems);
+        ctx.write(userIDWritable, result);
       }
     }
   }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/GenericRecommendedItem.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/GenericRecommendedItem.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/GenericRecommendedItem.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/GenericRecommendedItem.java
 Wed Mar  6 12:22:01 2013
@@ -32,7 +32,7 @@ import com.google.common.base.Preconditi
 public final class GenericRecommendedItem implements RecommendedItem, 
Serializable {
   
   private final long itemID;
-  private final float value;
+  private float value;
   
   /**
    * @throws IllegalArgumentException
@@ -53,7 +53,14 @@ public final class GenericRecommendedIte
   public float getValue() {
     return value;
   }
-  
+
+  @Override
+  public void capToMaxValue(float maxValue) {
+    if (value > maxValue) {
+      value = maxValue;
+    }
+  }
+
   @Override
   public String toString() {
     return "RecommendedItem[item:" + itemID + ", value:" + value + ']';

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/recommender/RecommendedItem.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/recommender/RecommendedItem.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/recommender/RecommendedItem.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/recommender/RecommendedItem.java
 Wed Mar  6 12:22:01 2013
@@ -37,5 +37,11 @@ public interface RecommendedItem {
    * @return strength of the preference
    */
   float getValue();
-  
+
+  /**
+   * set a maximum preference value
+   *
+   * @param maxValue
+   */
+  void capToMaxValue(float maxValue);
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
 Wed Mar  6 12:22:01 2013
@@ -28,10 +28,13 @@ import java.io.IOException;
 public class MergeVectorsReducer extends
     
Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable>
 {
 
+  private VectorWritable result = new VectorWritable();
+
   @Override
   public void reduce(WritableComparable<?> key, Iterable<VectorWritable> 
vectors, Context ctx)
       throws IOException, InterruptedException {
     Vector merged = VectorWritable.merge(vectors.iterator()).get();
-    ctx.write(key, new VectorWritable(new 
SequentialAccessSparseVector(merged)));
+    result.set(new SequentialAccessSparseVector(merged));
+    ctx.write(key, result);
   }
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java 
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java 
Wed Mar  6 12:22:01 2013
@@ -41,6 +41,10 @@ public final class VectorWritable extend
   public VectorWritable() {
   }
 
+  public VectorWritable(boolean writesLaxPrecision) {
+    setWritesLaxPrecision(writesLaxPrecision);
+  }
+
   public VectorWritable(Vector vector) {
     this.vector = vector;
   }


Reply via email to