Author: dlyubimov
Date: Fri Sep 28 00:20:13 2012
New Revision: 1391272

URL: http://svn.apache.org/viewvc?rev=1391272&view=rev
Log:
    MAHOUT-1067 enhancements (named vector propagation, U*Sigma output option)

Modified:
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1391272&r1=1391271&r2=1391272&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
 Fri Sep 28 00:20:13 2012
@@ -49,6 +49,7 @@ import org.apache.mahout.common.iterator
 import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
 import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
 import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.NamedVector;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.function.Functions;
@@ -91,8 +92,8 @@ public final class BtJob {
   public static final String PROP_OUTER_PROD_BLOCK_HEIGHT =
     "ssvd.outerProdBlockHeight";
   public static final String PROP_RHAT_BROADCAST = "ssvd.rhat.broadcast";
-
   public static final String PROP_XI_PATH = "ssvdpca.xi.path";
+  public static final String PROP_NV = "ssvd.nv";
 
   static final double SPARSE_ZEROS_PCT_THRESHOLD = 0.1;
 
@@ -111,6 +112,7 @@ public final class BtJob {
     private Vector btRow;
     private SparseRowBlockAccumulator btCollector;
     private Context mapContext;
+    private boolean nv;
 
     // pca stuff
     private Vector sqAccum;
@@ -131,10 +133,9 @@ public final class BtJob {
 
       Vector qRow = qr.next();
       int kp = qRow.size();
-      qRowValue.set(qRow);
 
       // make sure Qs are inheriting A row labels.
-      outputQRow(key, qRowValue);
+      outputQRow(key, qRow, aRow);
 
       // MAHOUT-817
       if (computeSq) {
@@ -273,6 +274,9 @@ public final class BtJob {
       // MAHOUT-817
       computeSq = (conf.get(PROP_XI_PATH) != null);
 
+      // MAHOUT-1067
+      nv = conf.getBoolean(PROP_NV, false);
+
     }
 
     @Override
@@ -294,8 +298,13 @@ public final class BtJob {
     }
 
     @SuppressWarnings("unchecked")
-    private void outputQRow(Writable key, Writable value) throws IOException {
-      outputs.getCollector(OUTPUT_Q, null).collect(key, value);
+    private void outputQRow(Writable key, Vector qRow, Vector aRow ) throws 
IOException {
+      if (nv && (aRow instanceof NamedVector)) {
+        qRowValue.set(new NamedVector(qRow, ((NamedVector) aRow).getName()));
+      } else {
+        qRowValue.set(qRow);
+      }
+      outputs.getCollector(OUTPUT_Q, null).collect(key, qRowValue);
     }
   }
 
@@ -512,6 +521,12 @@ public final class BtJob {
                                      
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
                                      IntWritable.class,
                                      VectorWritable.class);
+      /*
+       * MAHOUT-1067: if we are asked to output BBT products then named vector
+       * names should be propagated to Q too so that UJob could pick them up
+       * from there.
+       */
+      oldApiJob.setBoolean(PROP_NV, true);
     }
     if (xiPath != null) {
       // compute pca -related stuff as well

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java?rev=1391272&r1=1391271&r2=1391272&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
 Fri Sep 28 00:20:13 2012
@@ -57,12 +57,16 @@ public class SSVDCli extends AbstractJob
     addOption("computeU", "U", "compute U (true/false)", String.valueOf(true));
     addOption("uHalfSigma",
               "uhs",
-              "Compute U as UHat=U x pow(Sigma,0.5)",
+              "Compute U * Sigma^0.5",
+              String.valueOf(false));
+    addOption("uSigma",
+              "us",
+              "Compute U * Sigma",
               String.valueOf(false));
     addOption("computeV", "V", "compute V (true/false)", String.valueOf(true));
     addOption("vHalfSigma",
               "vhs",
-              "compute V as VHat= V x pow(Sigma,0.5)",
+              "compute V * Sigma^0.5",
               String.valueOf(false));
     addOption("reduceTasks",
               "t",
@@ -100,6 +104,7 @@ public class SSVDCli extends AbstractJob
     boolean computeU = Boolean.parseBoolean(getOption("computeU"));
     boolean computeV = Boolean.parseBoolean(getOption("computeV"));
     boolean cUHalfSigma = Boolean.parseBoolean(getOption("uHalfSigma"));
+    boolean cUSigma = Boolean.parseBoolean(getOption("uSigma"));
     boolean cVHalfSigma = Boolean.parseBoolean(getOption("vHalfSigma"));
     int reduceTasks = Integer.parseInt(getOption("reduceTasks"));
     boolean broadcast = Boolean.parseBoolean(getOption("broadcast"));
@@ -131,6 +136,7 @@ public class SSVDCli extends AbstractJob
     solver.setComputeV(computeV);
     solver.setcUHalfSigma(cUHalfSigma);
     solver.setcVHalfSigma(cVHalfSigma);
+    solver.setcUSigma(cUSigma);
     solver.setOuterBlockHeight(h);
     solver.setAbtBlockHeight(abh);
     solver.setQ(q);

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java?rev=1391272&r1=1391271&r2=1391272&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
 Fri Sep 28 00:20:13 2012
@@ -92,6 +92,10 @@ public class SSVDSolver {
   private boolean computeV = true;
   private String uPath;
   private String vPath;
+  private String uSigmaPath;
+  private String uHalfSigmaPath;
+  private String vSigmaPath;
+  private String vHalfSigmaPath;
   private int outerBlockHeight = 30000;
   private int abtBlockHeight = 200000;
 
@@ -106,7 +110,9 @@ public class SSVDSolver {
   private final int reduceTasks;
   private int minSplitSize = -1;
   private boolean cUHalfSigma;
+  private boolean cUSigma;
   private boolean cVHalfSigma;
+  private boolean cVSigma;
   private boolean overwrite;
   private boolean broadcast = true;
   private Path pcaMeanPath;
@@ -151,14 +157,6 @@ public class SSVDSolver {
     this.reduceTasks = reduceTasks;
   }
 
-  public void setcUHalfSigma(boolean cUHat) {
-    this.cUHalfSigma = cUHat;
-  }
-
-  public void setcVHalfSigma(boolean cVHat) {
-    this.cVHalfSigma = cVHat;
-  }
-
   public int getQ() {
     return q;
   }
@@ -175,6 +173,7 @@ public class SSVDSolver {
 
   /**
    * The setting controlling whether to compute U matrix of low rank SSVD.
+   * Default true.
    * 
    */
   public void setComputeU(boolean val) {
@@ -192,6 +191,37 @@ public class SSVDSolver {
   }
 
   /**
+   * 
+   * @param cUHat whether produce U*Sigma^0.5 as well (default false)
+   */
+  public void setcUHalfSigma(boolean cUHat) {
+    this.cUHalfSigma = cUHat;
+  }
+
+  /**
+   * 
+   * @param cVHat whether produce V*Sigma^0.5 as well (default false)
+   */
+  public void setcVHalfSigma(boolean cVHat) {
+    this.cVHalfSigma = cVHat;
+  }
+
+  /**
+   * 
+   * @param cUSigma whether produce U*Sigma output as well (default false)
+   */
+  public void setcUSigma(boolean cUSigma) {
+    this.cUSigma = cUSigma;
+  }
+
+  /**
+   * @param cVSigma whether produce V*Sigma output as well (default false)
+   */
+  public void setcVSigma(boolean cVSigma) {
+    this.cVSigma = cVSigma;
+  }
+
+  /**
    * Sometimes, if requested A blocks become larger than a split, we may need 
to
    * use that to ensure at least k+p rows of A get into a split. This is
    * requirement necessary to obtain orthonormalized Q blocks of SSVD.
@@ -232,6 +262,22 @@ public class SSVDSolver {
     return vPath;
   }
 
+  public String getuSigmaPath() {
+    return uSigmaPath;
+  }
+
+  public String getuHalfSigmaPath() {
+    return uHalfSigmaPath;
+  }
+
+  public String getvSigmaPath() {
+    return vSigmaPath;
+  }
+
+  public String getvHalfSigmaPath() {
+    return vHalfSigmaPath;
+  }
+
   public boolean isOverwrite() {
     return overwrite;
   }
@@ -334,7 +380,11 @@ public class SSVDSolver {
       Path uHatPath = new Path(outputPath, "UHat");
       Path svPath = new Path(outputPath, "Sigma");
       Path uPath = new Path(outputPath, "U");
+      Path uSigmaPath = new Path(outputPath, "USigma");
+      Path uHalfSigmaPath = new Path(outputPath, "UHalfSigma");
       Path vPath = new Path(outputPath, "V");
+      Path vHalfSigmaPath = new Path(outputPath, "VHalfSigma");
+      Path vSigmaPath = new Path(outputPath, "VSigma");
 
       Path pcaBasePath = new Path(outputPath, "pca");
 
@@ -391,9 +441,6 @@ public class SSVDSolver {
        * bit too many (I would be happy i that were ever the case though).
        */
 
-      //sbPath = new Path(pcaBasePath, "sb0");
-      //sqPath = new Path(pcaBasePath, "sq0");
-
       BtJob.run(conf,
                 inputPath,
                 qPath,
@@ -514,10 +561,38 @@ public class SSVDSolver {
                  k,
                  reduceTasks,
                  labelType,
-                 cUHalfSigma);
+                 OutputScalingEnum.NOSCALING);
         // actually this is map-only job anyway
       }
 
+      UJob uhsjob = null;
+      if (cUHalfSigma) {
+        uhsjob = new UJob();
+        uhsjob.run(conf,
+                 new Path(btPath, BtJob.OUTPUT_Q + "-*"),
+                 uHatPath,
+                 svPath,
+                 uHalfSigmaPath,
+                 k,
+                 reduceTasks,
+                 labelType,
+                 OutputScalingEnum.HALFSIGMA);
+      }
+
+      UJob usjob = null;
+      if (cUSigma) {
+        usjob = new UJob();
+        usjob.run(conf,
+                 new Path(btPath, BtJob.OUTPUT_Q + "-*"),
+                 uHatPath,
+                 svPath,
+                 uSigmaPath,
+                 k,
+                 reduceTasks,
+                 labelType,
+                 OutputScalingEnum.SIGMA);
+      }
+
       VJob vjob = null;
       if (computeV) {
         vjob = new VJob();
@@ -530,17 +605,63 @@ public class SSVDSolver {
                  vPath,
                  k,
                  reduceTasks,
-                 cVHalfSigma);
+                 OutputScalingEnum.NOSCALING);
+      }
+
+      VJob vhsjob = null;
+      if (cVHalfSigma) {
+        vhsjob = new VJob();
+        vhsjob.run(conf,
+                   new Path(btPath, BtJob.OUTPUT_BT + "-*"),
+                   pcaMeanPath,
+                   sqPath,
+                   uHatPath,
+                   svPath,
+                   vHalfSigmaPath,
+                   k,
+                   reduceTasks,
+                   OutputScalingEnum.HALFSIGMA);
+      }
+
+      VJob vsjob = null;
+      if (cVSigma) {
+        vsjob = new VJob();
+        vsjob.run(conf,
+                  new Path(btPath, BtJob.OUTPUT_BT + "-*"),
+                  pcaMeanPath,
+                  sqPath,
+                  uHatPath,
+                  svPath,
+                  vSigmaPath,
+                  k,
+                  reduceTasks,
+                  OutputScalingEnum.SIGMA);
       }
 
       if (ujob != null) {
         ujob.waitForCompletion();
         this.uPath = uPath.toString();
       }
+      if (uhsjob != null) {
+        uhsjob.waitForCompletion();
+        this.uHalfSigmaPath = uHalfSigmaPath.toString();
+      }
+      if (usjob != null) {
+        usjob.waitForCompletion();
+        this.uSigmaPath = uSigmaPath.toString();
+      }
       if (vjob != null) {
         vjob.waitForCompletion();
         this.vPath = vPath.toString();
       }
+      if (vhsjob != null) {
+        vhsjob.waitForCompletion();
+        this.vHalfSigmaPath = vHalfSigmaPath.toString();
+      }
+      if (vsjob != null) {
+        vsjob.waitForCompletion();
+        this.vSigmaPath = vSigmaPath.toString();
+      }
 
     } catch (InterruptedException exc) {
       throw new IOException("Interrupted", exc);
@@ -550,6 +671,9 @@ public class SSVDSolver {
     } finally {
       IOUtils.close(closeables);
     }
+  }
 
+  static enum OutputScalingEnum {
+    NOSCALING, SIGMA, HALFSIGMA
   }
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java?rev=1391272&r1=1391271&r2=1391272&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
 Fri Sep 28 00:20:13 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.mahout.math.DenseMatrix;
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.NamedVector;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.function.Functions;
@@ -47,14 +48,14 @@ public class UJob {
   private static final String OUTPUT_U = "u";
   private static final String PROP_UHAT_PATH = "ssvd.uhat.path";
   private static final String PROP_SIGMA_PATH = "ssvd.sigma.path";
-  private static final String PROP_U_HALFSIGMA = "ssvd.u.halfsigma";
+  private static final String PROP_OUTPUT_SCALING = "ssvd.u.output.scaling";
   private static final String PROP_K = "ssvd.k";
 
   private Job job;
 
   public void run(Configuration conf, Path inputPathQ, Path inputUHatPath,
       Path sigmaPath, Path outputPath, int k, int numReduceTasks,
-      Class<? extends Writable> labelClass, boolean uHalfSigma)
+      Class<? extends Writable> labelClass, SSVDSolver.OutputScalingEnum 
outputScaling)
     throws ClassNotFoundException, InterruptedException, IOException {
 
     job = new Job(conf);
@@ -81,9 +82,7 @@ public class UJob {
 
     job.getConfiguration().set(PROP_UHAT_PATH, inputUHatPath.toString());
     job.getConfiguration().set(PROP_SIGMA_PATH, sigmaPath.toString());
-    if (uHalfSigma) {
-      job.getConfiguration().set(PROP_U_HALFSIGMA, "y");
-    }
+    job.getConfiguration().set(PROP_OUTPUT_SCALING, outputScaling.name());
     job.getConfiguration().setInt(PROP_K, k);
     job.setNumReduceTasks(0);
     job.submit();
@@ -125,6 +124,14 @@ public class UJob {
         }
       }
 
+      /*
+       * MAHOUT-1067: inherit A names too.
+       */
+      if (qRow instanceof NamedVector) {
+        uRowWritable.set(new NamedVector(uRow, ((NamedVector) 
qRow).getName()));
+      } else
+        uRowWritable.set(uRow);
+
       context.write(key, uRowWritable); // U inherits original A row labels.
     }
 
@@ -144,11 +151,18 @@ public class UJob {
       uRow = new DenseVector(k);
       uRowWritable = new VectorWritable(uRow);
 
-      if (context.getConfiguration().get(PROP_U_HALFSIGMA) != null) {
+      SSVDSolver.OutputScalingEnum outputScaling =
+        SSVDSolver.OutputScalingEnum.valueOf(context.getConfiguration()
+                                                    .get(PROP_OUTPUT_SCALING));
+      switch (outputScaling) {
+      case SIGMA:
+        sValues = SSVDHelper.loadVector(sigmaPath, context.getConfiguration());
+        break;
+      case HALFSIGMA:
         sValues = SSVDHelper.loadVector(sigmaPath, context.getConfiguration());
         sValues.assign(Functions.SQRT);
+        break;
       }
-
     }
 
   }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java?rev=1391272&r1=1391271&r2=1391272&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
 Fri Sep 28 00:20:13 2012
@@ -43,7 +43,7 @@ public class VJob {
   private static final String OUTPUT_V = "v";
   private static final String PROP_UHAT_PATH = "ssvd.uhat.path";
   private static final String PROP_SIGMA_PATH = "ssvd.sigma.path";
-  private static final String PROP_V_HALFSIGMA = "ssvd.v.halfsigma";
+  private static final String PROP_OUTPUT_SCALING = "ssvd.v.output.scaling";
   private static final String PROP_K = "ssvd.k";
   public static final String PROP_SQ_PATH = "ssvdpca.sq.path";
   public static final String PROP_XI_PATH = "ssvdpca.xi.path";
@@ -110,8 +110,17 @@ public class VJob {
       vRowWritable = new VectorWritable(vRow);
 
       sValues = SSVDHelper.loadVector(sigmaPath, conf);
-      if (conf.get(PROP_V_HALFSIGMA) != null) {
+      SSVDSolver.OutputScalingEnum outputScaling =
+        SSVDSolver.OutputScalingEnum.valueOf(context.getConfiguration()
+                                                    .get(PROP_OUTPUT_SCALING));
+      switch (outputScaling) {
+      case SIGMA:
+        sValues.assign(1.0);
+        break;
+      case HALFSIGMA:
+        sValues = SSVDHelper.loadVector(sigmaPath, context.getConfiguration());
         sValues.assign(Functions.SQRT);
+        break;
       }
 
       /*
@@ -141,7 +150,7 @@ public class VJob {
    * @param outputPath
    * @param k
    * @param numReduceTasks
-   * @param vHalfSigma
+   * @param outputScaling output scaling: apply Sigma, or Sigma^0.5, or none
    * @throws ClassNotFoundException
    * @throws InterruptedException
    * @throws IOException
@@ -157,7 +166,7 @@ public class VJob {
                   Path outputPath,
                   int k,
                   int numReduceTasks,
-                  boolean vHalfSigma) throws ClassNotFoundException,
+                  SSVDSolver.OutputScalingEnum outputScaling ) throws 
ClassNotFoundException,
     InterruptedException, IOException {
 
     job = new Job(conf);
@@ -186,9 +195,7 @@ public class VJob {
 
     job.getConfiguration().set(PROP_UHAT_PATH, inputUHatPath.toString());
     job.getConfiguration().set(PROP_SIGMA_PATH, inputSigmaPath.toString());
-    if (vHalfSigma) {
-      job.getConfiguration().set(PROP_V_HALFSIGMA, "y");
-    }
+    job.getConfiguration().set(PROP_OUTPUT_SCALING, outputScaling.name());
     job.getConfiguration().setInt(PROP_K, k);
     job.setNumReduceTasks(0);
 


Reply via email to