Author: jeastman
Date: Mon Sep 13 16:48:23 2010
New Revision: 996599
URL: http://svn.apache.org/viewvc?rev=996599&view=rev
Log:
MAHOUT-294: Factored a run(...) method out of EigenVerificationJob.run([]) so
it can be called from TestClusterDumper. Made a few of its other methods
private that were never used outside. All tests run
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java?rev=996599&r1=996598&r2=996599&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
Mon Sep 13 16:48:23 2010
@@ -17,6 +17,14 @@
package org.apache.mahout.math.hadoop.decomposer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,15 +48,6 @@ import org.apache.mahout.math.hadoop.Dis
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
/**
* <p>Class for taking the output of an eigendecomposition (specified as a
Path location), and verifies correctness,
* in terms of the following: if you have a vector e, and a matrix m, then let
e' = m.timesSquared(v); the error
@@ -69,24 +68,35 @@ import java.util.Map;
*/
public class EigenVerificationJob extends AbstractJob {
+ public static final String LARGEST_CLEAN_EIGENS = "largestCleanEigens";
+
private static final Logger log =
LoggerFactory.getLogger(EigenVerificationJob.class);
private SingularVectorVerifier eigenVerifier;
+
private VectorIterable eigensToVerify;
+
private VectorIterable corpus;
+
private double maxError;
+
private double minEigenValue;
+
private boolean loadEigensInMemory;
+
private Path tmpOut;
+
private Path outPath;
+ private JobConf conf;
+
public void setEigensToVerify(VectorIterable eigens) {
eigensToVerify = eigens;
}
@Override
public int run(String[] args) throws Exception {
- Map<String,String> argMap = handleArgs(args);
+ Map<String, String> argMap = handleArgs(args);
if (argMap == null) {
return -1;
} else if (argMap.isEmpty()) {
@@ -95,15 +105,51 @@ public class EigenVerificationJob extend
outPath = getOutputPath();
tmpOut = new Path(outPath, "tmp");
- if (argMap.get("--eigenInput") != null && eigensToVerify == null) {
- prepareEigens(new Path(argMap.get("--eigenInput")),
argMap.get("--inMemory") != null);
+ Path eigenInput = null;
+ boolean inMemory = false;
+ if (argMap.get("--eigenInput") != null) {
+ eigenInput = new Path(argMap.get("--eigenInput"));
+ inMemory = argMap.get("--inMemory") != null;
}
+ Path corpusInput = new Path(argMap.get("--corpusInput"));
- maxError = Double.parseDouble(argMap.get("--maxError"));
- minEigenValue = Double.parseDouble(argMap.get("--minEigenvalue"));
+ run(corpusInput, eigenInput, outPath, tmpOut,
Double.parseDouble(argMap.get("--maxError")), Double.parseDouble(argMap
+ .get("--minEigenvalue")), inMemory, new JobConf(getConf()));
- DistributedRowMatrix c = new DistributedRowMatrix(new
Path(argMap.get("--corpusInput")), tmpOut, 1, 1);
- c.configure(new JobConf(getConf()));
+ return 0;
+ }
+
+ /**
+ * Run the job with the given arguments
+ * @param corpusInput the corpus input Path
+ * @param eigenInput the eigenvector input Path
+ * @param output the output Path
+ * @param tempOut temporary output Path
+ * @param maxError a double representing the maximum error
+ * @param minEigenValue a double representing the minimum eigenvalue
+ * @param inMemory a boolean requesting in-memory preparation
+ * @param config the JobConf to use, or null if a default is ok (saves
referencing JobConf in calling classes unless needed)
+ * @throws IOException
+ */
+ public void run(Path corpusInput,
+ Path eigenInput,
+ Path output,
+ Path tempOut,
+ double maxError,
+ double minEigenValue,
+ boolean inMemory,
+ JobConf config) throws IOException {
+ this.outPath = output;
+ this.tmpOut = tempOut;
+ this.maxError = maxError;
+ this.minEigenValue = minEigenValue;
+ this.conf = config != null ? config : new JobConf();
+
+ if (eigenInput != null && eigensToVerify == null) {
+ prepareEigens(eigenInput, inMemory);
+ }
+ DistributedRowMatrix c = new DistributedRowMatrix(corpusInput, tempOut, 1,
1);
+ c.configure(conf);
corpus = c;
// set up eigenverifier and orthoverifier TODO: allow multithreaded
execution
@@ -113,21 +159,20 @@ public class EigenVerificationJob extend
//VectorIterable pairwiseInnerProducts = computePairwiseInnerProducts();
computePairwiseInnerProducts();
- Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens();
+ Map<MatrixSlice, EigenStatus> eigenMetaData = verifyEigens();
- List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta =
pruneEigens(eigenMetaData);
+ List<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta =
pruneEigens(eigenMetaData);
saveCleanEigens(prunedEigenMeta);
-
- return 0;
}
- public Map<String,String> handleArgs(String[] args) {
+ private Map<String, String> handleArgs(String[] args) {
addOutputOption();
- addOption("eigenInput", "ei",
- "The Path for purported eigenVector input files
(SequenceFile<WritableComparable,VectorWritable>.", null);
- addOption("corpusInput", "ci",
- "The Path for corpus input files
(SequenceFile<WritableComparable,VectorWritable>.");
+ addOption("eigenInput",
+ "ei",
+ "The Path for purported eigenVector input files
(SequenceFile<WritableComparable,VectorWritable>.",
+ null);
+ addOption("corpusInput", "ci", "The Path for corpus input files
(SequenceFile<WritableComparable,VectorWritable>.");
addOption(DefaultOptionCreator.outputOption().create());
addOption(DefaultOptionCreator.helpOption());
addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have
enough!)", "false");
@@ -137,23 +182,19 @@ public class EigenVerificationJob extend
return parseArguments(args);
}
- public VectorIterable computePairwiseInnerProducts() {
+ private VectorIterable computePairwiseInnerProducts() {
return OrthonormalityVerifier.pairwiseInnerProducts(eigensToVerify);
}
- public void saveCleanEigens(List<Map.Entry<MatrixSlice,EigenStatus>>
prunedEigenMeta) throws IOException {
- Path path = new Path(outPath, "largestCleanEigens");
- Configuration conf = getConf();
+ private void saveCleanEigens(List<Map.Entry<MatrixSlice, EigenStatus>>
prunedEigenMeta) throws IOException {
+ Path path = new Path(outPath, LARGEST_CLEAN_EIGENS);
FileSystem fs = FileSystem.get(conf);
SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path,
IntWritable.class, VectorWritable.class);
IntWritable iw = new IntWritable();
- for (Map.Entry<MatrixSlice,EigenStatus> pruneSlice : prunedEigenMeta) {
+ for (Map.Entry<MatrixSlice, EigenStatus> pruneSlice : prunedEigenMeta) {
MatrixSlice s = pruneSlice.getKey();
EigenStatus meta = pruneSlice.getValue();
- EigenVector ev = new EigenVector((DenseVector)s.vector(),
- meta.getEigenValue(),
- Math.abs(1 - meta.getCosAngle()),
- s.index());
+ EigenVector ev = new EigenVector((DenseVector) s.vector(),
meta.getEigenValue(), Math.abs(1 - meta.getCosAngle()), s.index());
log.info("appending {} to {}", ev, path);
VectorWritable vw = new VectorWritable(ev);
iw.set(s.index());
@@ -162,16 +203,16 @@ public class EigenVerificationJob extend
seqWriter.close();
}
- public List<Map.Entry<MatrixSlice,EigenStatus>>
pruneEigens(Map<MatrixSlice,EigenStatus> eigenMetaData) {
- List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = new
ArrayList<Map.Entry<MatrixSlice,EigenStatus>>();
+ private List<Map.Entry<MatrixSlice, EigenStatus>>
pruneEigens(Map<MatrixSlice, EigenStatus> eigenMetaData) {
+ List<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta = new
ArrayList<Map.Entry<MatrixSlice, EigenStatus>>();
- for (Map.Entry<MatrixSlice,EigenStatus> entry : eigenMetaData.entrySet()) {
+ for (Map.Entry<MatrixSlice, EigenStatus> entry : eigenMetaData.entrySet())
{
if (Math.abs(1 - entry.getValue().getCosAngle()) < maxError &&
entry.getValue().getEigenValue() > minEigenValue) {
prunedEigenMeta.add(entry);
}
}
- Collections.sort(prunedEigenMeta, new
Comparator<Map.Entry<MatrixSlice,EigenStatus>>() {
+ Collections.sort(prunedEigenMeta, new Comparator<Map.Entry<MatrixSlice,
EigenStatus>>() {
@Override
public int compare(Map.Entry<MatrixSlice, EigenStatus> e1,
Map.Entry<MatrixSlice, EigenStatus> e2) {
return e1.getKey().index() - e2.getKey().index();
@@ -180,7 +221,7 @@ public class EigenVerificationJob extend
return prunedEigenMeta;
}
- public Map<MatrixSlice,EigenStatus> verifyEigens() {
+ private Map<MatrixSlice, EigenStatus> verifyEigens() {
Map<MatrixSlice, EigenStatus> eigenMetaData = new HashMap<MatrixSlice,
EigenStatus>();
for (MatrixSlice slice : eigensToVerify) {
@@ -192,20 +233,18 @@ public class EigenVerificationJob extend
private void prepareEigens(Path eigenInput, boolean inMemory) {
DistributedRowMatrix eigens = new DistributedRowMatrix(eigenInput, tmpOut,
1, 1);
- eigens.configure(new JobConf(getConf()));
+ eigens.configure(conf);
if (inMemory) {
List<Vector> eigenVectors = new ArrayList<Vector>();
for (MatrixSlice slice : eigens) {
eigenVectors.add(slice.vector());
}
- eigensToVerify = new SparseRowMatrix(new int[] {eigenVectors.size(),
eigenVectors.get(0).size()},
- eigenVectors.toArray(new
Vector[eigenVectors.size()]),
- true,
- true);
+ eigensToVerify = new SparseRowMatrix(new int[] { eigenVectors.size(),
eigenVectors.get(0).size() }, eigenVectors
+ .toArray(new Vector[eigenVectors.size()]), true, true);
} else {
eigensToVerify = eigens;
- }
+ }
}
public static void main(String[] args) throws Exception {
Modified:
mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java?rev=996599&r1=996598&r2=996599&view=diff
==============================================================================
---
mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
(original)
+++
mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
Mon Sep 13 16:48:23 2010
@@ -56,6 +56,7 @@ import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.hadoop.DistributedRowMatrix;
import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
+import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
import org.apache.mahout.utils.MahoutTestCase;
import org.apache.mahout.utils.clustering.ClusterDumper;
import org.apache.mahout.utils.vectors.TFIDF;
@@ -252,6 +253,10 @@ public final class TestClusterDumper ext
Path testData = getTestTempDirPath("testdata");
int sampleDimension = sampleData.get(0).get().size();
solver.run(testData, tmp, eigenvectors, sampleData.size(),
sampleDimension, false, desiredRank);
+
+ new EigenVerificationJob().run(testData, eigenvectors, output, tmp, 0.5,
0.0, true, null);
+ Path cleanEigenvectors = new Path(output,
EigenVerificationJob.LARGEST_CLEAN_EIGENS);
+
// build in-memory data matrix A
Matrix a = new DenseMatrix(sampleData.size(), sampleDimension);
int i = 0;
@@ -260,8 +265,8 @@ public final class TestClusterDumper ext
}
// extract the eigenvectors into P
Matrix p = new DenseMatrix(39, desiredRank - 1);
- FileSystem fs = FileSystem.get(eigenvectors.toUri(), conf);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, eigenvectors,
conf);
+ FileSystem fs = FileSystem.get(cleanEigenvectors.toUri(), conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
cleanEigenvectors, conf);
try {
Writable key =
reader.getKeyClass().asSubclass(Writable.class).newInstance();
Writable value =
reader.getValueClass().asSubclass(Writable.class).newInstance();
@@ -317,8 +322,11 @@ public final class TestClusterDumper ext
int sampleDimension = sampleData.get(0).get().size();
solver.run(testData, tmp, eigenvectors, sampleData.size(),
sampleDimension, false, desiredRank);
+ new EigenVerificationJob().run(testData, eigenvectors, output, tmp, 0.5,
0.0, false, null);
+ Path cleanEigenvectors = new Path(output,
EigenVerificationJob.LARGEST_CLEAN_EIGENS);
+
// now multiply the testdata matrix and the eigenvector matrix
- DistributedRowMatrix svdT = new DistributedRowMatrix(eigenvectors, tmp,
desiredRank - 1, sampleDimension);
+ DistributedRowMatrix svdT = new DistributedRowMatrix(cleanEigenvectors,
tmp, desiredRank - 1, sampleDimension);
JobConf conf = new JobConf(config);
svdT.configure(conf);
DistributedRowMatrix a = new DistributedRowMatrix(testData, tmp,
sampleData.size(), sampleDimension);