Author: dlyubimov
Date: Fri Sep 28 00:20:13 2012
New Revision: 1391272
URL: http://svn.apache.org/viewvc?rev=1391272&view=rev
Log:
MAHOUT-1067 enhancements (named vector propagation, U*Sigma output option)
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1391272&r1=1391271&r2=1391272&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
Fri Sep 28 00:20:13 2012
@@ -49,6 +49,7 @@ import org.apache.mahout.common.iterator
import
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
import
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.NamedVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.function.Functions;
@@ -91,8 +92,8 @@ public final class BtJob {
public static final String PROP_OUTER_PROD_BLOCK_HEIGHT =
"ssvd.outerProdBlockHeight";
public static final String PROP_RHAT_BROADCAST = "ssvd.rhat.broadcast";
-
public static final String PROP_XI_PATH = "ssvdpca.xi.path";
+ public static final String PROP_NV = "ssvd.nv";
static final double SPARSE_ZEROS_PCT_THRESHOLD = 0.1;
@@ -111,6 +112,7 @@ public final class BtJob {
private Vector btRow;
private SparseRowBlockAccumulator btCollector;
private Context mapContext;
+ private boolean nv;
// pca stuff
private Vector sqAccum;
@@ -131,10 +133,9 @@ public final class BtJob {
Vector qRow = qr.next();
int kp = qRow.size();
- qRowValue.set(qRow);
// make sure Qs are inheriting A row labels.
- outputQRow(key, qRowValue);
+ outputQRow(key, qRow, aRow);
// MAHOUT-817
if (computeSq) {
@@ -273,6 +274,9 @@ public final class BtJob {
// MAHOUT-817
computeSq = (conf.get(PROP_XI_PATH) != null);
+ // MAHOUT-1067
+ nv = conf.getBoolean(PROP_NV, false);
+
}
@Override
@@ -294,8 +298,13 @@ public final class BtJob {
}
@SuppressWarnings("unchecked")
- private void outputQRow(Writable key, Writable value) throws IOException {
- outputs.getCollector(OUTPUT_Q, null).collect(key, value);
+ private void outputQRow(Writable key, Vector qRow, Vector aRow ) throws
IOException {
+ if (nv && (aRow instanceof NamedVector)) {
+ qRowValue.set(new NamedVector(qRow, ((NamedVector) aRow).getName()));
+ } else {
+ qRowValue.set(qRow);
+ }
+ outputs.getCollector(OUTPUT_Q, null).collect(key, qRowValue);
}
}
@@ -512,6 +521,12 @@ public final class BtJob {
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
IntWritable.class,
VectorWritable.class);
+ /*
+ * MAHOUT-1067: if we are asked to output BBT products then named vector
+ * names should be propagated to Q too so that UJob could pick them up
+ * from there.
+ */
+ oldApiJob.setBoolean(PROP_NV, true);
}
if (xiPath != null) {
// compute pca -related stuff as well
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java?rev=1391272&r1=1391271&r2=1391272&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
Fri Sep 28 00:20:13 2012
@@ -57,12 +57,16 @@ public class SSVDCli extends AbstractJob
addOption("computeU", "U", "compute U (true/false)", String.valueOf(true));
addOption("uHalfSigma",
"uhs",
- "Compute U as UHat=U x pow(Sigma,0.5)",
+ "Compute U * Sigma^0.5",
+ String.valueOf(false));
+ addOption("uSigma",
+ "us",
+ "Compute U * Sigma",
String.valueOf(false));
addOption("computeV", "V", "compute V (true/false)", String.valueOf(true));
addOption("vHalfSigma",
"vhs",
- "compute V as VHat= V x pow(Sigma,0.5)",
+ "compute V * Sigma^0.5",
String.valueOf(false));
addOption("reduceTasks",
"t",
@@ -100,6 +104,7 @@ public class SSVDCli extends AbstractJob
boolean computeU = Boolean.parseBoolean(getOption("computeU"));
boolean computeV = Boolean.parseBoolean(getOption("computeV"));
boolean cUHalfSigma = Boolean.parseBoolean(getOption("uHalfSigma"));
+ boolean cUSigma = Boolean.parseBoolean(getOption("uSigma"));
boolean cVHalfSigma = Boolean.parseBoolean(getOption("vHalfSigma"));
int reduceTasks = Integer.parseInt(getOption("reduceTasks"));
boolean broadcast = Boolean.parseBoolean(getOption("broadcast"));
@@ -131,6 +136,7 @@ public class SSVDCli extends AbstractJob
solver.setComputeV(computeV);
solver.setcUHalfSigma(cUHalfSigma);
solver.setcVHalfSigma(cVHalfSigma);
+ solver.setcUSigma(cUSigma);
solver.setOuterBlockHeight(h);
solver.setAbtBlockHeight(abh);
solver.setQ(q);
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java?rev=1391272&r1=1391271&r2=1391272&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
Fri Sep 28 00:20:13 2012
@@ -92,6 +92,10 @@ public class SSVDSolver {
private boolean computeV = true;
private String uPath;
private String vPath;
+ private String uSigmaPath;
+ private String uHalfSigmaPath;
+ private String vSigmaPath;
+ private String vHalfSigmaPath;
private int outerBlockHeight = 30000;
private int abtBlockHeight = 200000;
@@ -106,7 +110,9 @@ public class SSVDSolver {
private final int reduceTasks;
private int minSplitSize = -1;
private boolean cUHalfSigma;
+ private boolean cUSigma;
private boolean cVHalfSigma;
+ private boolean cVSigma;
private boolean overwrite;
private boolean broadcast = true;
private Path pcaMeanPath;
@@ -151,14 +157,6 @@ public class SSVDSolver {
this.reduceTasks = reduceTasks;
}
- public void setcUHalfSigma(boolean cUHat) {
- this.cUHalfSigma = cUHat;
- }
-
- public void setcVHalfSigma(boolean cVHat) {
- this.cVHalfSigma = cVHat;
- }
-
public int getQ() {
return q;
}
@@ -175,6 +173,7 @@ public class SSVDSolver {
/**
* The setting controlling whether to compute U matrix of low rank SSVD.
+ * Default true.
*
*/
public void setComputeU(boolean val) {
@@ -192,6 +191,37 @@ public class SSVDSolver {
}
/**
+ *
+ * @param cUHat whether produce U*Sigma^0.5 as well (default false)
+ */
+ public void setcUHalfSigma(boolean cUHat) {
+ this.cUHalfSigma = cUHat;
+ }
+
+ /**
+ *
+ * @param cVHat whether produce V*Sigma^0.5 as well (default false)
+ */
+ public void setcVHalfSigma(boolean cVHat) {
+ this.cVHalfSigma = cVHat;
+ }
+
+ /**
+ *
+ * @param cUSigma whether produce U*Sigma output as well (default false)
+ */
+ public void setcUSigma(boolean cUSigma) {
+ this.cUSigma = cUSigma;
+ }
+
+ /**
+ * @param cVSigma whether produce V*Sigma output as well (default false)
+ */
+ public void setcVSigma(boolean cVSigma) {
+ this.cVSigma = cVSigma;
+ }
+
+ /**
* Sometimes, if requested A blocks become larger than a split, we may need
to
* use that to ensure at least k+p rows of A get into a split. This is
* requirement necessary to obtain orthonormalized Q blocks of SSVD.
@@ -232,6 +262,22 @@ public class SSVDSolver {
return vPath;
}
+ public String getuSigmaPath() {
+ return uSigmaPath;
+ }
+
+ public String getuHalfSigmaPath() {
+ return uHalfSigmaPath;
+ }
+
+ public String getvSigmaPath() {
+ return vSigmaPath;
+ }
+
+ public String getvHalfSigmaPath() {
+ return vHalfSigmaPath;
+ }
+
public boolean isOverwrite() {
return overwrite;
}
@@ -334,7 +380,11 @@ public class SSVDSolver {
Path uHatPath = new Path(outputPath, "UHat");
Path svPath = new Path(outputPath, "Sigma");
Path uPath = new Path(outputPath, "U");
+ Path uSigmaPath = new Path(outputPath, "USigma");
+ Path uHalfSigmaPath = new Path(outputPath, "UHalfSigma");
Path vPath = new Path(outputPath, "V");
+ Path vHalfSigmaPath = new Path(outputPath, "VHalfSigma");
+ Path vSigmaPath = new Path(outputPath, "VSigma");
Path pcaBasePath = new Path(outputPath, "pca");
@@ -391,9 +441,6 @@ public class SSVDSolver {
* bit too many (I would be happy i that were ever the case though).
*/
- //sbPath = new Path(pcaBasePath, "sb0");
- //sqPath = new Path(pcaBasePath, "sq0");
-
BtJob.run(conf,
inputPath,
qPath,
@@ -514,10 +561,38 @@ public class SSVDSolver {
k,
reduceTasks,
labelType,
- cUHalfSigma);
+ OutputScalingEnum.NOSCALING);
// actually this is map-only job anyway
}
+ UJob uhsjob = null;
+ if (cUHalfSigma) {
+ uhsjob = new UJob();
+ uhsjob.run(conf,
+ new Path(btPath, BtJob.OUTPUT_Q + "-*"),
+ uHatPath,
+ svPath,
+ uHalfSigmaPath,
+ k,
+ reduceTasks,
+ labelType,
+ OutputScalingEnum.HALFSIGMA);
+ }
+
+ UJob usjob = null;
+ if (cUSigma) {
+ usjob = new UJob();
+ usjob.run(conf,
+ new Path(btPath, BtJob.OUTPUT_Q + "-*"),
+ uHatPath,
+ svPath,
+ uSigmaPath,
+ k,
+ reduceTasks,
+ labelType,
+ OutputScalingEnum.SIGMA);
+ }
+
VJob vjob = null;
if (computeV) {
vjob = new VJob();
@@ -530,17 +605,63 @@ public class SSVDSolver {
vPath,
k,
reduceTasks,
- cVHalfSigma);
+ OutputScalingEnum.NOSCALING);
+ }
+
+ VJob vhsjob = null;
+ if (cVHalfSigma) {
+ vhsjob = new VJob();
+ vhsjob.run(conf,
+ new Path(btPath, BtJob.OUTPUT_BT + "-*"),
+ pcaMeanPath,
+ sqPath,
+ uHatPath,
+ svPath,
+ vHalfSigmaPath,
+ k,
+ reduceTasks,
+ OutputScalingEnum.HALFSIGMA);
+ }
+
+ VJob vsjob = null;
+ if (cVSigma) {
+ vsjob = new VJob();
+ vsjob.run(conf,
+ new Path(btPath, BtJob.OUTPUT_BT + "-*"),
+ pcaMeanPath,
+ sqPath,
+ uHatPath,
+ svPath,
+ vSigmaPath,
+ k,
+ reduceTasks,
+ OutputScalingEnum.SIGMA);
}
if (ujob != null) {
ujob.waitForCompletion();
this.uPath = uPath.toString();
}
+ if (uhsjob != null) {
+ uhsjob.waitForCompletion();
+ this.uHalfSigmaPath = uHalfSigmaPath.toString();
+ }
+ if (usjob != null) {
+ usjob.waitForCompletion();
+ this.uSigmaPath = uSigmaPath.toString();
+ }
if (vjob != null) {
vjob.waitForCompletion();
this.vPath = vPath.toString();
}
+ if (vhsjob != null) {
+ vhsjob.waitForCompletion();
+ this.vHalfSigmaPath = vHalfSigmaPath.toString();
+ }
+ if (vsjob != null) {
+ vsjob.waitForCompletion();
+ this.vSigmaPath = vSigmaPath.toString();
+ }
} catch (InterruptedException exc) {
throw new IOException("Interrupted", exc);
@@ -550,6 +671,9 @@ public class SSVDSolver {
} finally {
IOUtils.close(closeables);
}
+ }
+ static enum OutputScalingEnum {
+ NOSCALING, SIGMA, HALFSIGMA
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java?rev=1391272&r1=1391271&r2=1391272&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
Fri Sep 28 00:20:13 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.mahout.math.DenseMatrix;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.NamedVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.function.Functions;
@@ -47,14 +48,14 @@ public class UJob {
private static final String OUTPUT_U = "u";
private static final String PROP_UHAT_PATH = "ssvd.uhat.path";
private static final String PROP_SIGMA_PATH = "ssvd.sigma.path";
- private static final String PROP_U_HALFSIGMA = "ssvd.u.halfsigma";
+ private static final String PROP_OUTPUT_SCALING = "ssvd.u.output.scaling";
private static final String PROP_K = "ssvd.k";
private Job job;
public void run(Configuration conf, Path inputPathQ, Path inputUHatPath,
Path sigmaPath, Path outputPath, int k, int numReduceTasks,
- Class<? extends Writable> labelClass, boolean uHalfSigma)
+ Class<? extends Writable> labelClass, SSVDSolver.OutputScalingEnum
outputScaling)
throws ClassNotFoundException, InterruptedException, IOException {
job = new Job(conf);
@@ -81,9 +82,7 @@ public class UJob {
job.getConfiguration().set(PROP_UHAT_PATH, inputUHatPath.toString());
job.getConfiguration().set(PROP_SIGMA_PATH, sigmaPath.toString());
- if (uHalfSigma) {
- job.getConfiguration().set(PROP_U_HALFSIGMA, "y");
- }
+ job.getConfiguration().set(PROP_OUTPUT_SCALING, outputScaling.name());
job.getConfiguration().setInt(PROP_K, k);
job.setNumReduceTasks(0);
job.submit();
@@ -125,6 +124,14 @@ public class UJob {
}
}
+ /*
+ * MAHOUT-1067: inherit A names too.
+ */
+ if (qRow instanceof NamedVector) {
+ uRowWritable.set(new NamedVector(uRow, ((NamedVector)
qRow).getName()));
+ } else
+ uRowWritable.set(uRow);
+
context.write(key, uRowWritable); // U inherits original A row labels.
}
@@ -144,11 +151,18 @@ public class UJob {
uRow = new DenseVector(k);
uRowWritable = new VectorWritable(uRow);
- if (context.getConfiguration().get(PROP_U_HALFSIGMA) != null) {
+ SSVDSolver.OutputScalingEnum outputScaling =
+ SSVDSolver.OutputScalingEnum.valueOf(context.getConfiguration()
+ .get(PROP_OUTPUT_SCALING));
+ switch (outputScaling) {
+ case SIGMA:
+ sValues = SSVDHelper.loadVector(sigmaPath, context.getConfiguration());
+ break;
+ case HALFSIGMA:
sValues = SSVDHelper.loadVector(sigmaPath, context.getConfiguration());
sValues.assign(Functions.SQRT);
+ break;
}
-
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java?rev=1391272&r1=1391271&r2=1391272&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
Fri Sep 28 00:20:13 2012
@@ -43,7 +43,7 @@ public class VJob {
private static final String OUTPUT_V = "v";
private static final String PROP_UHAT_PATH = "ssvd.uhat.path";
private static final String PROP_SIGMA_PATH = "ssvd.sigma.path";
- private static final String PROP_V_HALFSIGMA = "ssvd.v.halfsigma";
+ private static final String PROP_OUTPUT_SCALING = "ssvd.v.output.scaling";
private static final String PROP_K = "ssvd.k";
public static final String PROP_SQ_PATH = "ssvdpca.sq.path";
public static final String PROP_XI_PATH = "ssvdpca.xi.path";
@@ -110,8 +110,17 @@ public class VJob {
vRowWritable = new VectorWritable(vRow);
sValues = SSVDHelper.loadVector(sigmaPath, conf);
- if (conf.get(PROP_V_HALFSIGMA) != null) {
+ SSVDSolver.OutputScalingEnum outputScaling =
+ SSVDSolver.OutputScalingEnum.valueOf(context.getConfiguration()
+ .get(PROP_OUTPUT_SCALING));
+ switch (outputScaling) {
+ case SIGMA:
+ sValues.assign(1.0);
+ break;
+ case HALFSIGMA:
+ sValues = SSVDHelper.loadVector(sigmaPath, context.getConfiguration());
sValues.assign(Functions.SQRT);
+ break;
}
/*
@@ -141,7 +150,7 @@ public class VJob {
* @param outputPath
* @param k
* @param numReduceTasks
- * @param vHalfSigma
+ * @param outputScaling output scaling: apply Sigma, or Sigma^0.5, or none
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
@@ -157,7 +166,7 @@ public class VJob {
Path outputPath,
int k,
int numReduceTasks,
- boolean vHalfSigma) throws ClassNotFoundException,
+ SSVDSolver.OutputScalingEnum outputScaling ) throws
ClassNotFoundException,
InterruptedException, IOException {
job = new Job(conf);
@@ -186,9 +195,7 @@ public class VJob {
job.getConfiguration().set(PROP_UHAT_PATH, inputUHatPath.toString());
job.getConfiguration().set(PROP_SIGMA_PATH, inputSigmaPath.toString());
- if (vHalfSigma) {
- job.getConfiguration().set(PROP_V_HALFSIGMA, "y");
- }
+ job.getConfiguration().set(PROP_OUTPUT_SCALING, outputScaling.name());
job.getConfiguration().setInt(PROP_K, k);
job.setNumReduceTasks(0);