Author: ssc
Date: Wed Mar 6 12:22:01 2013
New Revision: 1453310
URL: http://svn.apache.org/r1453310
Log:
MAHOUT-1151 Object reuse in distributed ALS
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/FixedSizePriorityQueue.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/GenericRecommendedItem.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/recommender/RecommendedItem.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/FixedSizePriorityQueue.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/FixedSizePriorityQueue.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/FixedSizePriorityQueue.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/FixedSizePriorityQueue.java
Wed Mar 6 12:22:01 2013
@@ -71,7 +71,7 @@ abstract class FixedSizePriorityQueue<T>
return topItems;
}
- protected T peek() {
+ public T peek() {
return queue.peek();
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
Wed Mar 6 12:22:01 2013
@@ -201,6 +201,12 @@ public class ParallelALSFactorizationJob
}
static class ItemRatingVectorsMapper extends
Mapper<LongWritable,Text,IntWritable,VectorWritable> {
+
+ private IntWritable itemIDWritable = new IntWritable();
+ private VectorWritable ratingsWritable = new VectorWritable(true);
+
+ private Vector ratings = new
SequentialAccessSparseVector(Integer.MAX_VALUE, 1);
+
@Override
protected void map(LongWritable offset, Text line, Context ctx) throws
IOException, InterruptedException {
String[] tokens = TasteHadoopUtils.splitPrefTokens(line.toString());
@@ -208,10 +214,15 @@ public class ParallelALSFactorizationJob
int itemID = Integer.parseInt(tokens[1]);
float rating = Float.parseFloat(tokens[2]);
- Vector ratings = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
- ratings.set(userID, rating);
+ ratings.setQuick(userID, rating);
- ctx.write(new IntWritable(itemID), new VectorWritable(ratings, true));
+ itemIDWritable.set(itemID);
+ ratingsWritable.set(ratings);
+
+ ctx.write(itemIDWritable, ratingsWritable);
+
+ // prepare instance for reuse
+ ratings.setQuick(userID, 0.0d);
}
}
@@ -240,6 +251,8 @@ public class ParallelALSFactorizationJob
private int numFeatures;
private OpenIntObjectHashMap<Vector> UorM;
+ private VectorWritable uiOrmjWritable = new VectorWritable();
+
@Override
protected void setup(Mapper.Context ctx) throws IOException,
InterruptedException {
lambda = Double.parseDouble(ctx.getConfiguration().get(LAMBDA));
@@ -254,7 +267,8 @@ public class ParallelALSFactorizationJob
@Override
protected void map(IntWritable userOrItemID, VectorWritable
ratingsWritable, Context ctx)
throws IOException, InterruptedException {
- Vector ratings = new SequentialAccessSparseVector(ratingsWritable.get());
+ Vector ratings = ratingsWritable.get();
+
List<Vector> featureVectors = Lists.newArrayList();
Iterator<Vector.Element> interactions = ratings.iterateNonZero();
while (interactions.hasNext()) {
@@ -264,7 +278,8 @@ public class ParallelALSFactorizationJob
Vector uiOrmj = AlternatingLeastSquaresSolver.solve(featureVectors,
ratings, lambda, numFeatures);
- ctx.write(userOrItemID, new VectorWritable(uiOrmj));
+ uiOrmjWritable.set(uiOrmj);
+ ctx.write(userOrItemID, uiOrmjWritable);
}
}
@@ -272,6 +287,8 @@ public class ParallelALSFactorizationJob
private ImplicitFeedbackAlternatingLeastSquaresSolver solver;
+ private VectorWritable uiOrmjWritable = new VectorWritable();
+
@Override
protected void setup(Mapper.Context ctx) throws IOException,
InterruptedException {
double lambda = Double.parseDouble(ctx.getConfiguration().get(LAMBDA));
@@ -289,15 +306,20 @@ public class ParallelALSFactorizationJob
@Override
protected void map(IntWritable userOrItemID, VectorWritable
ratingsWritable, Context ctx)
throws IOException, InterruptedException {
- Vector ratings = new SequentialAccessSparseVector(ratingsWritable.get());
- Vector uiOrmj = solver.solve(ratings);
+ Vector uiOrmj = solver.solve(ratingsWritable.get());
- ctx.write(userOrItemID, new VectorWritable(uiOrmj));
+ uiOrmjWritable.set(uiOrmj);
+ ctx.write(userOrItemID, uiOrmjWritable);
}
}
static class AverageRatingMapper extends
Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ private IntWritable firstIndex = new IntWritable(0);
+ private Vector featureVector = new
RandomAccessSparseVector(Integer.MAX_VALUE, 1);
+ private VectorWritable featureVectorWritable = new VectorWritable();
+
@Override
protected void map(IntWritable r, VectorWritable v, Context ctx) throws
IOException, InterruptedException {
RunningAverage avg = new FullRunningAverage();
@@ -305,9 +327,13 @@ public class ParallelALSFactorizationJob
while (elements.hasNext()) {
avg.addDatum(elements.next().get());
}
- Vector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
- vector.setQuick(r.get(), avg.getAverage());
- ctx.write(new IntWritable(0), new VectorWritable(vector));
+
+ featureVector.setQuick(r.get(), avg.getAverage());
+ featureVectorWritable.set(featureVector);
+ ctx.write(firstIndex, featureVectorWritable);
+
+ // prepare instance for reuse
+ featureVector.setQuick(r.get(), 0.0d);
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
Wed Mar 6 12:22:01 2013
@@ -118,6 +118,8 @@ public class RecommenderJob extends Abst
private int recommendationsPerUser;
private float maxRating;
+ private RecommendedItemsWritable result = new RecommendedItemsWritable();
+
@Override
protected void setup(Context ctx) throws IOException, InterruptedException
{
recommendationsPerUser =
ctx.getConfiguration().getInt(NUM_RECOMMENDATIONS,
@@ -151,19 +153,25 @@ public class RecommenderJob extends Abst
public boolean apply(int itemID, Vector itemFeatures) {
if (!alreadyRatedItems.contains(itemID)) {
double predictedRating = U.get(userID).dot(itemFeatures);
- topKItems.offer(new GenericRecommendedItem(itemID, (float)
predictedRating));
+
+ // manual check to avoid an object instantiation per unknown item
+ if (topKItems.size() < recommendationsPerUser || (float)
predictedRating > topKItems.peek().getValue()) {
+ topKItems.offer(new GenericRecommendedItem(itemID, (float)
predictedRating));
+ }
}
return true;
}
});
- List<RecommendedItem> recommendedItems =
Lists.newArrayListWithExpectedSize(recommendationsPerUser);
- for (RecommendedItem topItem : topKItems.retrieve()) {
- recommendedItems.add(new GenericRecommendedItem(topItem.getItemID(),
Math.min(topItem.getValue(), maxRating)));
- }
-
if (!topKItems.isEmpty()) {
- ctx.write(userIDWritable, new
RecommendedItemsWritable(recommendedItems));
+
+ List<RecommendedItem> recommendedItems = topKItems.retrieve();
+ for (RecommendedItem topItem : topKItems.retrieve()) {
+ topItem.capToMaxValue(maxRating);
+ }
+
+ result.set(recommendedItems);
+ ctx.write(userIDWritable, result);
}
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/GenericRecommendedItem.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/GenericRecommendedItem.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/GenericRecommendedItem.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/GenericRecommendedItem.java
Wed Mar 6 12:22:01 2013
@@ -32,7 +32,7 @@ import com.google.common.base.Preconditi
public final class GenericRecommendedItem implements RecommendedItem,
Serializable {
private final long itemID;
- private final float value;
+ private float value;
/**
* @throws IllegalArgumentException
@@ -53,7 +53,14 @@ public final class GenericRecommendedIte
public float getValue() {
return value;
}
-
+
+ @Override
+ public void capToMaxValue(float maxValue) {
+ if (value > maxValue) {
+ value = maxValue;
+ }
+ }
+
@Override
public String toString() {
return "RecommendedItem[item:" + itemID + ", value:" + value + ']';
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/recommender/RecommendedItem.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/recommender/RecommendedItem.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/recommender/RecommendedItem.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/recommender/RecommendedItem.java
Wed Mar 6 12:22:01 2013
@@ -37,5 +37,11 @@ public interface RecommendedItem {
* @return strength of the preference
*/
float getValue();
-
+
+ /**
+ * set a maximum preference value
+ *
+ * @param maxValue
+ */
+ void capToMaxValue(float maxValue);
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
Wed Mar 6 12:22:01 2013
@@ -28,10 +28,13 @@ import java.io.IOException;
public class MergeVectorsReducer extends
Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable>
{
+ private VectorWritable result = new VectorWritable();
+
@Override
public void reduce(WritableComparable<?> key, Iterable<VectorWritable>
vectors, Context ctx)
throws IOException, InterruptedException {
Vector merged = VectorWritable.merge(vectors.iterator()).get();
- ctx.write(key, new VectorWritable(new
SequentialAccessSparseVector(merged)));
+ result.set(new SequentialAccessSparseVector(merged));
+ ctx.write(key, result);
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java?rev=1453310&r1=1453309&r2=1453310&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
Wed Mar 6 12:22:01 2013
@@ -41,6 +41,10 @@ public final class VectorWritable extend
public VectorWritable() {
}
+ public VectorWritable(boolean writesLaxPrecision) {
+ setWritesLaxPrecision(writesLaxPrecision);
+ }
+
public VectorWritable(Vector vector) {
this.vector = vector;
}